should backoff reasonably now
This commit is contained in:
parent
e6647e47f7
commit
f32deba38f
@ -2,9 +2,8 @@ package database
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"git.eeqj.de/sneak/feta/instance"
|
"git.eeqj.de/sneak/feta/instance"
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
|
|
||||||
_ "github.com/jinzhu/gorm/dialects/sqlite"
|
_ "github.com/jinzhu/gorm/dialects/sqlite"
|
||||||
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (m *Manager) SaveInstance(i *instance.Instance) error {
|
func (m *Manager) SaveInstance(i *instance.Instance) error {
|
||||||
@ -20,6 +19,7 @@ func (m *Manager) SaveInstance(i *instance.Instance) error {
|
|||||||
UUID: i.UUID,
|
UUID: i.UUID,
|
||||||
Disabled: i.Disabled,
|
Disabled: i.Disabled,
|
||||||
ErrorCount: i.ErrorCount,
|
ErrorCount: i.ErrorCount,
|
||||||
|
ConsecutiveErrorCount: i.ConsecutiveErrorCount,
|
||||||
FSMState: i.Status(),
|
FSMState: i.Status(),
|
||||||
Fetching: i.Fetching,
|
Fetching: i.Fetching,
|
||||||
HighestID: i.HighestID,
|
HighestID: i.HighestID,
|
||||||
@ -46,6 +46,7 @@ func (m *Manager) SaveInstance(i *instance.Instance) error {
|
|||||||
m.db.Where("UUID = ?", i.UUID).First(&ei)
|
m.db.Where("UUID = ?", i.UUID).First(&ei)
|
||||||
ei.Disabled = i.Disabled
|
ei.Disabled = i.Disabled
|
||||||
ei.ErrorCount = i.ErrorCount
|
ei.ErrorCount = i.ErrorCount
|
||||||
|
ei.ConsecutiveErrorCount = i.ConsecutiveErrorCount
|
||||||
ei.FSMState = i.Status()
|
ei.FSMState = i.Status()
|
||||||
ei.Fetching = i.Fetching
|
ei.Fetching = i.Fetching
|
||||||
ei.HighestID = i.HighestID
|
ei.HighestID = i.HighestID
|
||||||
@ -74,6 +75,7 @@ func (m *Manager) ListInstances() ([]*instance.Instance, error) {
|
|||||||
x.UUID = i.UUID
|
x.UUID = i.UUID
|
||||||
x.Disabled = i.Disabled
|
x.Disabled = i.Disabled
|
||||||
x.ErrorCount = i.ErrorCount
|
x.ErrorCount = i.ErrorCount
|
||||||
|
x.ConsecutiveErrorCount = i.ConsecutiveErrorCount
|
||||||
x.InitialFSMState = i.FSMState
|
x.InitialFSMState = i.FSMState
|
||||||
x.Fetching = i.Fetching
|
x.Fetching = i.Fetching
|
||||||
x.HighestID = i.HighestID
|
x.HighestID = i.HighestID
|
||||||
|
@ -27,6 +27,7 @@ type StoredToot struct {
|
|||||||
type APInstance struct {
|
type APInstance struct {
|
||||||
gorm.Model
|
gorm.Model
|
||||||
UUID uuid.UUID `gorm:"type:uuid;primary_key;"`
|
UUID uuid.UUID `gorm:"type:uuid;primary_key;"`
|
||||||
|
ConsecutiveErrorCount uint
|
||||||
ErrorCount uint
|
ErrorCount uint
|
||||||
SuccessCount uint
|
SuccessCount uint
|
||||||
HighestID uint
|
HighestID uint
|
||||||
|
@ -17,18 +17,19 @@ import (
|
|||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
//import "github.com/gin-gonic/gin"
|
|
||||||
|
|
||||||
const nodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0"
|
const nodeInfoSchemaVersionTwoName = "http://nodeinfo.diaspora.software/ns/schema/2.0"
|
||||||
const instanceNodeinfoTimeout = time.Second * 50
|
const instanceNodeinfoTimeout = time.Second * 60 * 2 // 2m
|
||||||
const instanceHTTPTimeout = time.Second * 120
|
const instanceHTTPTimeout = time.Second * 60 * 2 // 2m
|
||||||
const instanceSpiderInterval = time.Second * 120
|
const instanceSpiderInterval = time.Second * 60 * 2 // 2m
|
||||||
const instanceErrorInterval = time.Second * 60 * 30
|
const instanceErrorInterval = time.Second * 60 * 60 // 1h
|
||||||
|
const instancePersistentErrorInterval = time.Second * 86400 // 1d
|
||||||
|
const zeroInterval = time.Second * 0 // 0s
|
||||||
|
|
||||||
// Instance stores all the information we know about an instance
|
// Instance stores all the information we know about an instance
|
||||||
type Instance struct {
|
type Instance struct {
|
||||||
Disabled bool
|
Disabled bool
|
||||||
ErrorCount uint
|
ErrorCount uint
|
||||||
|
ConsecutiveErrorCount uint
|
||||||
FSM *fsm.FSM
|
FSM *fsm.FSM
|
||||||
Fetching bool
|
Fetching bool
|
||||||
HighestID uint
|
HighestID uint
|
||||||
@ -121,10 +122,26 @@ func (i *Instance) Unlock() {
|
|||||||
i.structLock.Unlock()
|
i.structLock.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *Instance) bumpFetch() {
|
func (i *Instance) bumpFetchError() {
|
||||||
i.Lock()
|
i.Lock()
|
||||||
defer i.Unlock()
|
probablyDead := i.ConsecutiveErrorCount > 3
|
||||||
i.NextFetch = time.Now().Add(120 * time.Second)
|
i.Unlock()
|
||||||
|
|
||||||
|
if probablyDead {
|
||||||
|
// if three consecutive fetch errors happen, only try once per day:
|
||||||
|
i.setNextFetchAfter(instancePersistentErrorInterval)
|
||||||
|
} else {
|
||||||
|
// otherwise give them 1h
|
||||||
|
i.setNextFetchAfter(instanceErrorInterval)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Instance) bumpFetchSuccess() {
|
||||||
|
i.setNextFetchAfter(instanceSpiderInterval)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *Instance) scheduleFetchImmediate() {
|
||||||
|
i.setNextFetchAfter(zeroInterval)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *Instance) setNextFetchAfter(d time.Duration) {
|
func (i *Instance) setNextFetchAfter(d time.Duration) {
|
||||||
@ -139,8 +156,7 @@ func (i *Instance) Fetch() {
|
|||||||
i.fetchingLock.Lock()
|
i.fetchingLock.Lock()
|
||||||
defer i.fetchingLock.Unlock()
|
defer i.fetchingLock.Unlock()
|
||||||
|
|
||||||
i.setNextFetchAfter(instanceErrorInterval)
|
i.bumpFetchError()
|
||||||
|
|
||||||
err := i.DetectNodeTypeIfNecessary()
|
err := i.DetectNodeTypeIfNecessary()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debug().
|
log.Debug().
|
||||||
@ -149,8 +165,7 @@ func (i *Instance) Fetch() {
|
|||||||
Msg("unable to fetch instance metadata")
|
Msg("unable to fetch instance metadata")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
i.scheduleFetchImmediate()
|
||||||
i.setNextFetchAfter(instanceSpiderInterval)
|
|
||||||
log.Info().
|
log.Info().
|
||||||
Str("hostname", i.Hostname).
|
Str("hostname", i.Hostname).
|
||||||
Msg("instance now ready for fetch")
|
Msg("instance now ready for fetch")
|
||||||
@ -207,12 +222,14 @@ func (i *Instance) registerError() {
|
|||||||
i.Lock()
|
i.Lock()
|
||||||
defer i.Unlock()
|
defer i.Unlock()
|
||||||
i.ErrorCount++
|
i.ErrorCount++
|
||||||
|
i.ConsecutiveErrorCount++
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *Instance) registerSuccess() {
|
func (i *Instance) registerSuccess() {
|
||||||
i.Lock()
|
i.Lock()
|
||||||
defer i.Unlock()
|
defer i.Unlock()
|
||||||
i.SuccessCount++
|
i.SuccessCount++
|
||||||
|
i.ConsecutiveErrorCount = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// Up returns true if the success count is >0
|
// Up returns true if the success count is >0
|
||||||
@ -405,9 +422,12 @@ func (i *Instance) fetchRecentToots() error {
|
|||||||
|
|
||||||
// it turns out pleroma supports the mastodon api so we'll just use that
|
// it turns out pleroma supports the mastodon api so we'll just use that
|
||||||
// for everything for now
|
// for everything for now
|
||||||
|
|
||||||
|
// FIXME would be nice to support non-https
|
||||||
url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true",
|
url := fmt.Sprintf("https://%s/api/v1/timelines/public?limit=40&local=true",
|
||||||
i.Hostname)
|
i.Hostname)
|
||||||
|
|
||||||
|
// FIXME support broken/expired certs
|
||||||
var c = &http.Client{
|
var c = &http.Client{
|
||||||
Timeout: instanceHTTPTimeout,
|
Timeout: instanceHTTPTimeout,
|
||||||
}
|
}
|
||||||
@ -461,7 +481,7 @@ func (i *Instance) fetchRecentToots() error {
|
|||||||
Msgf("got and parsed toots")
|
Msgf("got and parsed toots")
|
||||||
i.registerSuccess()
|
i.registerSuccess()
|
||||||
i.Event("TOOTS_FETCHED")
|
i.Event("TOOTS_FETCHED")
|
||||||
i.setNextFetchAfter(instanceSpiderInterval)
|
i.bumpFetchSuccess()
|
||||||
|
|
||||||
// this should go fast as either the channel is buffered bigly or the
|
// this should go fast as either the channel is buffered bigly or the
|
||||||
// ingester receives fast and does its own buffering, but run it in its
|
// ingester receives fast and does its own buffering, but run it in its
|
||||||
|
Loading…
Reference in New Issue
Block a user