Add support for callback timeouts via context
This commit is contained in:
parent
5063e5f260
commit
edafec0fc7
5
irc.go
5
irc.go
@ -21,6 +21,7 @@ package irc
|
|||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -79,6 +80,10 @@ func (irc *Connection) readLoop() {
|
|||||||
irc.lastMessageMutex.Unlock()
|
irc.lastMessageMutex.Unlock()
|
||||||
event, err := parseToEvent(msg)
|
event, err := parseToEvent(msg)
|
||||||
event.Connection = irc
|
event.Connection = irc
|
||||||
|
event.Ctx = context.Background()
|
||||||
|
if irc.CallbackTimeout != 0 {
|
||||||
|
event.Ctx, _ = context.WithTimeout(event.Ctx, irc.CallbackTimeout)
|
||||||
|
}
|
||||||
if err == nil {
|
if err == nil {
|
||||||
/* XXX: len(args) == 0: args should be empty */
|
/* XXX: len(args) == 0: args should be empty */
|
||||||
irc.RunCallbacks(event)
|
irc.RunCallbacks(event)
|
||||||
|
@ -1,6 +1,9 @@
|
|||||||
package irc
|
package irc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
|
"reflect"
|
||||||
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
@ -126,32 +129,61 @@ func (irc *Connection) RunCallbacks(event *Event) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
irc.eventsMutex.Lock()
|
irc.eventsMutex.Lock()
|
||||||
callbacks, ok := irc.events[event.Code]
|
callbacks := []func(*Event){}
|
||||||
irc.eventsMutex.Unlock()
|
eventCallbacks, ok := irc.events[event.Code]
|
||||||
if ok {
|
if ok {
|
||||||
|
for _, callback := range eventCallbacks {
|
||||||
|
callbacks = append(callbacks, callback)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
allCallbacks, ok := irc.events["*"]
|
||||||
|
if ok {
|
||||||
|
for _, callback := range allCallbacks {
|
||||||
|
callbacks = append(callbacks, callback)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
irc.eventsMutex.Unlock()
|
||||||
|
|
||||||
if irc.VerboseCallbackHandler {
|
if irc.VerboseCallbackHandler {
|
||||||
irc.Log.Printf("%v (%v) >> %#v\n", event.Code, len(callbacks), event)
|
irc.Log.Printf("%v (%v) >> %#v\n", event.Code, len(callbacks), event)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, callback := range callbacks {
|
done := make(chan bool)
|
||||||
|
possibleLogs := []string{}
|
||||||
|
for i, callback := range callbacks {
|
||||||
|
go func(done chan bool) {
|
||||||
callback(event)
|
callback(event)
|
||||||
|
done <- true
|
||||||
|
}(done)
|
||||||
|
callbackName := getFunctionName(callback)
|
||||||
|
start := time.Now()
|
||||||
|
select {
|
||||||
|
case <-event.Ctx.Done(): // context timed out!
|
||||||
|
irc.Log.Printf("TIMEOUT: %s timeout expired while executing %s, abandoning remaining callbacks", irc.CallbackTimeout, callbackName)
|
||||||
|
|
||||||
|
// If we timed out let's include context for how long each previous handler took
|
||||||
|
for _, logItem := range possibleLogs {
|
||||||
|
irc.Log.Println(logItem)
|
||||||
|
}
|
||||||
|
irc.Log.Printf("Callback %s ran for %s prior to timeout", callbackName, time.Since(start))
|
||||||
|
if len(callbacks) > i {
|
||||||
|
for _, callback := range callbacks[i+1:] {
|
||||||
|
irc.Log.Printf("Callback %s did not run", getFunctionName(callback))
|
||||||
}
|
}
|
||||||
} else if irc.VerboseCallbackHandler {
|
|
||||||
irc.Log.Printf("%v (0) >> %#v\n", event.Code, event)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
irc.eventsMutex.Lock()
|
// At this point our context has expired and it's not safe to execute anything else, lets bail.
|
||||||
allcallbacks, ok := irc.events["*"]
|
return
|
||||||
irc.eventsMutex.Unlock()
|
case <-done:
|
||||||
if ok {
|
elapsed := time.Since(start)
|
||||||
if irc.VerboseCallbackHandler {
|
logMsg := fmt.Sprintf("Callback %s took %s", getFunctionName(callback), elapsed)
|
||||||
irc.Log.Printf("%v (0) >> %#v\n", event.Code, event)
|
possibleLogs = append(possibleLogs, logMsg)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for _, callback := range allcallbacks {
|
func getFunctionName(f func(*Event)) string {
|
||||||
callback(event)
|
return runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name()
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set up some initial callbacks to handle the IRC/CTCP protocol.
|
// Set up some initial callbacks to handle the IRC/CTCP protocol.
|
||||||
|
@ -5,6 +5,7 @@
|
|||||||
package irc
|
package irc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
@ -29,6 +30,7 @@ type Connection struct {
|
|||||||
TLSConfig *tls.Config
|
TLSConfig *tls.Config
|
||||||
Version string
|
Version string
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
|
CallbackTimeout time.Duration
|
||||||
PingFreq time.Duration
|
PingFreq time.Duration
|
||||||
KeepAlive time.Duration
|
KeepAlive time.Duration
|
||||||
Server string
|
Server string
|
||||||
@ -69,6 +71,7 @@ type Event struct {
|
|||||||
Arguments []string
|
Arguments []string
|
||||||
Tags map[string]string
|
Tags map[string]string
|
||||||
Connection *Connection
|
Connection *Connection
|
||||||
|
Ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
// Retrieve the last message from Event arguments.
|
// Retrieve the last message from Event arguments.
|
||||||
|
Loading…
Reference in New Issue
Block a user