diff --git a/cmd/bsdaily/main.go b/cmd/bsdaily/main.go index a16ded0..fa565bf 100644 --- a/cmd/bsdaily/main.go +++ b/cmd/bsdaily/main.go @@ -17,22 +17,56 @@ func main() { slog.SetDefault(logger) var dateFlag string + var fromFlag string + var toFlag string rootCmd := &cobra.Command{ Use: "bsdaily", Short: "Extract a single day's data from the latest daily snapshot", SilenceUsage: true, RunE: func(cmd *cobra.Command, args []string) error { - var targetDate *time.Time - if dateFlag != "" { - t, err := time.Parse("2006-01-02", dateFlag) - if err != nil { - return fmt.Errorf("invalid date %q (expected YYYY-MM-DD): %w", dateFlag, err) + hasDate := dateFlag != "" + hasFrom := fromFlag != "" + hasTo := toFlag != "" + + // Validate mutual exclusivity + if hasDate && (hasFrom || hasTo) { + return fmt.Errorf("--date and --from/--to are mutually exclusive") + } + if hasFrom != hasTo { + if hasFrom { + return fmt.Errorf("--from requires --to") } - targetDate = &t + return fmt.Errorf("--to requires --from") } - if err := bsdaily.Run(targetDate); err != nil { + var targetDates []time.Time + + if hasDate { + t, err := time.Parse("2006-01-02", dateFlag) + if err != nil { + return fmt.Errorf("invalid --date %q (expected YYYY-MM-DD): %w", dateFlag, err) + } + targetDates = []time.Time{t} + } else if hasFrom { + from, err := time.Parse("2006-01-02", fromFlag) + if err != nil { + return fmt.Errorf("invalid --from %q (expected YYYY-MM-DD): %w", fromFlag, err) + } + to, err := time.Parse("2006-01-02", toFlag) + if err != nil { + return fmt.Errorf("invalid --to %q (expected YYYY-MM-DD): %w", toFlag, err) + } + if from.After(to) { + return fmt.Errorf("--from %s is after --to %s", fromFlag, toFlag) + } + for d := from; !d.After(to); d = d.AddDate(0, 0, 1) { + targetDates = append(targetDates, d) + } + } + // else: targetDates remains nil → Run() defaults to snapshot date minus one + + if err := bsdaily.Run(targetDates); err != nil { return err } slog.Info("completed successfully") @@ -41,6 +75,8 @@ func main() { } rootCmd.Flags().StringVarP(&dateFlag, "date", "d", "", "target date to extract (YYYY-MM-DD); defaults to snapshot date minus one day") + rootCmd.Flags().StringVar(&fromFlag, "from", "", "start of date range to extract (YYYY-MM-DD, inclusive); use with --to") + rootCmd.Flags().StringVar(&toFlag, "to", "", "end of date range to extract (YYYY-MM-DD, inclusive); use with --from") if err := rootCmd.Execute(); err != nil { os.Exit(1) diff --git a/internal/bsdaily/config.go b/internal/bsdaily/config.go index 85e23e3..df532ce 100644 --- a/internal/bsdaily/config.go +++ b/internal/bsdaily/config.go @@ -11,8 +11,7 @@ const ( SHMFilename = "firehose.db-shm" MinTmpFreeBytes = 500 * 1024 * 1024 * 1024 // 500 GB - MinDailiesFreeBytes = 20 * 1024 * 1024 * 1024 // 20 GB - PostCopyTmpMinFree = 100 * 1024 * 1024 * 1024 // 100 GB + MinDailiesFreeBytes = 20 * 1024 * 1024 * 1024 // 20 GB ) var snapshotPattern = regexp.MustCompile(`^zfs-auto-snap_daily-(\d{4}-\d{2}-\d{2})-\d{4}$`) diff --git a/internal/bsdaily/extract.go b/internal/bsdaily/extract.go new file mode 100644 index 0000000..6444b22 --- /dev/null +++ b/internal/bsdaily/extract.go @@ -0,0 +1,139 @@ +package bsdaily + +import ( + "database/sql" + "errors" + "fmt" + "log/slog" + "time" + + _ "modernc.org/sqlite" +) + +var ErrNoPosts = errors.New("no posts found for target day") + +// ExtractDay opens a new empty database at dstDBPath, attaches srcDBPath, +// and copies only the target day's data into it. This is much faster than +// pruning a full copy because it only reads/writes the small slice of data +// being kept. +func ExtractDay(srcDBPath, dstDBPath string, targetDay time.Time) error { + dayStart := targetDay.Format("2006-01-02") + "T00:00:00" + dayEnd := targetDay.AddDate(0, 0, 1).Format("2006-01-02") + "T00:00:00" + + slog.Info("extracting day", "from", dayStart, "until", dayEnd) + + db, err := sql.Open("sqlite", dstDBPath+"?_pragma=journal_mode(MEMORY)&_pragma=synchronous(OFF)&_pragma=cache_size(-200000)&_pragma=foreign_keys(OFF)") + if err != nil { + return fmt.Errorf("opening destination database: %w", err) + } + defer db.Close() + + // Attach source database + if _, err := db.Exec("ATTACH DATABASE ? AS src", srcDBPath); err != nil { + return fmt.Errorf("attaching source database: %w", err) + } + + // Copy table DDL from source + slog.Info("copying table DDL from source") + rows, err := db.Query("SELECT sql FROM src.sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' ORDER BY name") + if err != nil { + return fmt.Errorf("reading source schema: %w", err) + } + defer rows.Close() + + var ddlStatements []string + for rows.Next() { + var ddl string + if err := rows.Scan(&ddl); err != nil { + return fmt.Errorf("scanning DDL: %w", err) + } + ddlStatements = append(ddlStatements, ddl) + } + if err := rows.Err(); err != nil { + return fmt.Errorf("iterating DDL rows: %w", err) + } + + for _, ddl := range ddlStatements { + if _, err := db.Exec(ddl); err != nil { + return fmt.Errorf("creating table: %w\nDDL: %s", err, ddl) + } + } + + // Insert target day's data + slog.Info("inserting posts for target day") + result, err := db.Exec("INSERT INTO posts SELECT * FROM src.posts WHERE timestamp >= ? AND timestamp < ?", dayStart, dayEnd) + if err != nil { + return fmt.Errorf("inserting posts: %w", err) + } + postCount, _ := result.RowsAffected() + slog.Info("inserted posts", "count", postCount) + + if postCount == 0 { + return fmt.Errorf("%w %s - aborting to avoid producing empty output", + ErrNoPosts, targetDay.Format("2006-01-02")) + } + + slog.Info("inserting junction and lookup tables") + if _, err := db.Exec("INSERT INTO posts_hashtags SELECT * FROM src.posts_hashtags WHERE post_id IN (SELECT id FROM posts)"); err != nil { + return fmt.Errorf("inserting posts_hashtags: %w", err) + } + + if _, err := db.Exec("INSERT INTO posts_urls SELECT * FROM src.posts_urls WHERE post_id IN (SELECT id FROM posts)"); err != nil { + return fmt.Errorf("inserting posts_urls: %w", err) + } + + if _, err := db.Exec("INSERT INTO hashtags SELECT * FROM src.hashtags WHERE id IN (SELECT hashtag_id FROM posts_hashtags)"); err != nil { + return fmt.Errorf("inserting hashtags: %w", err) + } + + if _, err := db.Exec("INSERT INTO urls SELECT * FROM src.urls WHERE id IN (SELECT url_id FROM posts_urls)"); err != nil { + return fmt.Errorf("inserting urls: %w", err) + } + + if _, err := db.Exec("INSERT INTO users SELECT * FROM src.users WHERE did IN (SELECT user_did FROM posts)"); err != nil { + return fmt.Errorf("inserting users: %w", err) + } + + // Create indexes after bulk insert for speed + slog.Info("creating indexes") + idxRows, err := db.Query("SELECT sql FROM src.sqlite_master WHERE type='index' AND name NOT LIKE 'sqlite_%' AND sql IS NOT NULL ORDER BY name") + if err != nil { + return fmt.Errorf("reading source indexes: %w", err) + } + defer idxRows.Close() + + var idxStatements []string + for idxRows.Next() { + var idxSQL string + if err := idxRows.Scan(&idxSQL); err != nil { + return fmt.Errorf("scanning index DDL: %w", err) + } + idxStatements = append(idxStatements, idxSQL) + } + if err := idxRows.Err(); err != nil { + return fmt.Errorf("iterating index rows: %w", err) + } + + for _, idxSQL := range idxStatements { + if _, err := db.Exec(idxSQL); err != nil { + return fmt.Errorf("creating index: %w\nDDL: %s", err, idxSQL) + } + } + + // Detach source + if _, err := db.Exec("DETACH DATABASE src"); err != nil { + return fmt.Errorf("detaching source database: %w", err) + } + + // Verify post count + var verifyCount int64 + if err := db.QueryRow("SELECT COUNT(*) FROM posts").Scan(&verifyCount); err != nil { + return fmt.Errorf("verifying post count: %w", err) + } + if verifyCount != postCount { + return fmt.Errorf("post count mismatch: inserted %d but found %d", postCount, verifyCount) + } + slog.Info("extraction complete", "posts", verifyCount) + + return nil +} diff --git a/internal/bsdaily/prune.go b/internal/bsdaily/prune.go deleted file mode 100644 index 7aa6319..0000000 --- a/internal/bsdaily/prune.go +++ /dev/null @@ -1,143 +0,0 @@ -package bsdaily - -import ( - "database/sql" - "fmt" - "log/slog" - "os" - "time" - - _ "modernc.org/sqlite" -) - -func PruneDatabase(dbPath string, targetDay time.Time) error { - dayStart := targetDay.Format("2006-01-02") + "T00:00:00" - dayEnd := targetDay.AddDate(0, 0, 1).Format("2006-01-02") + "T00:00:00" - - slog.Info("pruning database", "keep_from", dayStart, "keep_until", dayEnd) - - // No foreign_keys — we handle junction tables manually to avoid per-row CASCADE overhead. - // journal_mode=MEMORY and synchronous=OFF are safe because this is a throwaway copy. - db, err := sql.Open("sqlite", dbPath+"?_pragma=journal_mode(MEMORY)&_pragma=synchronous(OFF)&_pragma=cache_size(-200000)") - if err != nil { - return fmt.Errorf("opening database: %w", err) - } - defer db.Close() - - // Verify we can query - var totalPosts int64 - if err := db.QueryRow("SELECT COUNT(*) FROM posts").Scan(&totalPosts); err != nil { - return fmt.Errorf("counting posts: %w", err) - } - slog.Info("total posts before pruning", "count", totalPosts) - - // Count posts in target range - var keepPosts int64 - if err := db.QueryRow("SELECT COUNT(*) FROM posts WHERE timestamp >= ? AND timestamp < ?", - dayStart, dayEnd).Scan(&keepPosts); err != nil { - return fmt.Errorf("counting target posts: %w", err) - } - slog.Info("posts in target day", "count", keepPosts, "date", targetDay.Format("2006-01-02")) - - if keepPosts == 0 { - return fmt.Errorf("no posts found for target day %s - aborting to avoid producing empty output", - targetDay.Format("2006-01-02")) - } - - tx, err := db.Begin() - if err != nil { - return fmt.Errorf("beginning transaction: %w", err) - } - defer tx.Rollback() - - // Delete junction table rows for posts outside target day (bulk, no CASCADE overhead) - slog.Info("deleting junction table rows for non-target posts") - keepSubquery := "SELECT id FROM posts WHERE timestamp >= ? AND timestamp < ?" - - result, err := tx.Exec("DELETE FROM posts_hashtags WHERE post_id NOT IN ("+keepSubquery+")", dayStart, dayEnd) - if err != nil { - return fmt.Errorf("deleting posts_hashtags: %w", err) - } - deleted, _ := result.RowsAffected() - slog.Info("deleted posts_hashtags rows", "count", deleted) - - result, err = tx.Exec("DELETE FROM posts_urls WHERE post_id NOT IN ("+keepSubquery+")", dayStart, dayEnd) - if err != nil { - return fmt.Errorf("deleting posts_urls: %w", err) - } - deleted, _ = result.RowsAffected() - slog.Info("deleted posts_urls rows", "count", deleted) - - // Delete posts outside target day - slog.Info("deleting posts outside target day") - result, err = tx.Exec("DELETE FROM posts WHERE timestamp < ? OR timestamp >= ?", dayStart, dayEnd) - if err != nil { - return fmt.Errorf("deleting posts: %w", err) - } - deleted, _ = result.RowsAffected() - slog.Info("deleted posts", "count", deleted) - - // Clean up orphaned hashtags - slog.Info("cleaning orphaned hashtags") - result, err = tx.Exec("DELETE FROM hashtags WHERE id NOT IN (SELECT DISTINCT hashtag_id FROM posts_hashtags)") - if err != nil { - return fmt.Errorf("deleting orphaned hashtags: %w", err) - } - deleted, _ = result.RowsAffected() - slog.Info("deleted orphaned hashtags", "count", deleted) - - // Clean up orphaned urls - slog.Info("cleaning orphaned urls") - result, err = tx.Exec("DELETE FROM urls WHERE id NOT IN (SELECT DISTINCT url_id FROM posts_urls)") - if err != nil { - return fmt.Errorf("deleting orphaned urls: %w", err) - } - deleted, _ = result.RowsAffected() - slog.Info("deleted orphaned urls", "count", deleted) - - // Clear media (no FK to posts, can't prune until post_id migration lands) - slog.Info("clearing media table") - result, err = tx.Exec("DELETE FROM media") - if err != nil { - return fmt.Errorf("deleting media: %w", err) - } - deleted, _ = result.RowsAffected() - slog.Info("deleted media rows", "count", deleted) - - // Clean up orphaned users - slog.Info("cleaning orphaned users") - result, err = tx.Exec("DELETE FROM users WHERE did NOT IN (SELECT DISTINCT user_did FROM posts)") - if err != nil { - return fmt.Errorf("deleting orphaned users: %w", err) - } - deleted, _ = result.RowsAffected() - slog.Info("deleted orphaned users", "count", deleted) - - if err := tx.Commit(); err != nil { - return fmt.Errorf("committing transaction: %w", err) - } - - // Verify remaining count - var remaining int64 - if err := db.QueryRow("SELECT COUNT(*) FROM posts").Scan(&remaining); err != nil { - return fmt.Errorf("counting remaining posts: %w", err) - } - if remaining != keepPosts { - return fmt.Errorf("post count mismatch after prune: expected %d, got %d", keepPosts, remaining) - } - slog.Info("posts remaining after prune", "count", remaining) - - // VACUUM to reclaim space - slog.Info("running VACUUM") - if _, err := db.Exec("VACUUM"); err != nil { - return fmt.Errorf("vacuum: %w", err) - } - - info, err := os.Stat(dbPath) - if err != nil { - return fmt.Errorf("stat after vacuum: %w", err) - } - slog.Info("database size after vacuum", "bytes", info.Size(), "mb", info.Size()/(1024*1024)) - - return nil -} diff --git a/internal/bsdaily/run.go b/internal/bsdaily/run.go index 6a8cdba..ad9d9ec 100644 --- a/internal/bsdaily/run.go +++ b/internal/bsdaily/run.go @@ -1,6 +1,7 @@ package bsdaily import ( + "errors" "fmt" "log/slog" "os" @@ -8,27 +9,20 @@ import ( "time" ) -func Run(targetDate *time.Time) error { +func Run(targetDates []time.Time) error { snapshotDir, snapshotDate, err := FindLatestDailySnapshot() if err != nil { return fmt.Errorf("finding latest snapshot: %w", err) } slog.Info("found latest daily snapshot", "dir", snapshotDir, "snapshot_date", snapshotDate.Format("2006-01-02")) - var targetDay time.Time - if targetDate != nil { - targetDay = *targetDate - } else { - targetDay = snapshotDate.AddDate(0, 0, -1) + if len(targetDates) == 0 { + targetDates = []time.Time{snapshotDate.AddDate(0, 0, -1)} } - slog.Info("target day for extraction", "date", targetDay.Format("2006-01-02")) - // Check if output already exists - outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01")) - outputFinal := filepath.Join(outputDir, targetDay.Format("2006-01-02")+".sql.zst") - if _, err := os.Stat(outputFinal); err == nil { - return fmt.Errorf("output file already exists: %s", outputFinal) - } + slog.Info("target days for extraction", "count", len(targetDates), + "first", targetDates[0].Format("2006-01-02"), + "last", targetDates[len(targetDates)-1].Format("2006-01-02")) // Check disk space if err := CheckFreeSpace(TmpBase, MinTmpFreeBytes, "tmpBase"); err != nil { @@ -82,46 +76,73 @@ func Run(targetDate *time.Time) error { } } - if err := CheckFreeSpace(TmpBase, PostCopyTmpMinFree, "tmpBase (post-copy)"); err != nil { - return err + // Process each day + processed := 0 + skipped := 0 + + for _, targetDay := range targetDates { + dayStr := targetDay.Format("2006-01-02") + slog.Info("processing day", "date", dayStr) + + // Check if output already exists + outputDir := filepath.Join(DailiesBase, targetDay.Format("2006-01")) + outputFinal := filepath.Join(outputDir, dayStr+".sql.zst") + if _, err := os.Stat(outputFinal); err == nil { + slog.Info("output already exists, skipping", "path", outputFinal) + skipped++ + continue + } + + // Extract target day into a per-day database + extractedDB := filepath.Join(tmpDir, "extracted-"+dayStr+".db") + slog.Info("extracting target day", "src", dstDB, "dst", extractedDB) + if err := ExtractDay(dstDB, extractedDB, targetDay); err != nil { + if errors.Is(err, ErrNoPosts) { + slog.Warn("no posts found, skipping day", "date", dayStr) + os.Remove(extractedDB) + skipped++ + continue + } + return fmt.Errorf("extracting day %s: %w", dayStr, err) + } + + // Dump to SQL and compress + if err := os.MkdirAll(outputDir, 0755); err != nil { + return fmt.Errorf("creating output directory %s: %w", outputDir, err) + } + + outputTmp := filepath.Join(outputDir, "."+dayStr+".sql.zst.tmp") + + slog.Info("dumping and compressing", "tmp_output", outputTmp) + if err := DumpAndCompress(extractedDB, outputTmp); err != nil { + os.Remove(outputTmp) + return fmt.Errorf("dump and compress for %s: %w", dayStr, err) + } + + slog.Info("verifying compressed output") + if err := VerifyOutput(outputTmp); err != nil { + os.Remove(outputTmp) + return fmt.Errorf("verification failed for %s: %w", dayStr, err) + } + + // Atomic rename to final path + slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal) + if err := os.Rename(outputTmp, outputFinal); err != nil { + return fmt.Errorf("atomic rename for %s: %w", dayStr, err) + } + + info, err := os.Stat(outputFinal) + if err != nil { + return fmt.Errorf("stat final output: %w", err) + } + slog.Info("day completed", "date", dayStr, "path", outputFinal, "size_bytes", info.Size()) + + // Remove extracted DB to reclaim space + os.Remove(extractedDB) + processed++ } - // Prune database to target day only - slog.Info("opening database for pruning", "path", dstDB) - if err := PruneDatabase(dstDB, targetDay); err != nil { - return fmt.Errorf("pruning database: %w", err) - } - - // Dump to SQL and compress - if err := os.MkdirAll(outputDir, 0755); err != nil { - return fmt.Errorf("creating output directory %s: %w", outputDir, err) - } - - outputTmp := filepath.Join(outputDir, "."+targetDay.Format("2006-01-02")+".sql.zst.tmp") - - slog.Info("dumping and compressing", "tmp_output", outputTmp) - if err := DumpAndCompress(dstDB, outputTmp); err != nil { - os.Remove(outputTmp) - return fmt.Errorf("dump and compress: %w", err) - } - - slog.Info("verifying compressed output") - if err := VerifyOutput(outputTmp); err != nil { - os.Remove(outputTmp) - return fmt.Errorf("verification failed: %w", err) - } - - // Atomic rename to final path - slog.Info("renaming to final output", "from", outputTmp, "to", outputFinal) - if err := os.Rename(outputTmp, outputFinal); err != nil { - return fmt.Errorf("atomic rename: %w", err) - } - - info, err := os.Stat(outputFinal) - if err != nil { - return fmt.Errorf("stat final output: %w", err) - } - slog.Info("final output written", "path", outputFinal, "size_bytes", info.Size()) + slog.Info("run summary", "processed", processed, "skipped", skipped, "total", len(targetDates)) return nil }