starting to work on notifications

This commit is contained in:
Jeffrey Paul 2019-11-18 00:14:05 -08:00
parent e499651c39
commit 3dde1d7019
2 changed files with 92 additions and 29 deletions

38
merp.go
View File

@ -10,6 +10,9 @@ import "github.com/google/uuid"
import "github.com/rs/zerolog/log" import "github.com/rs/zerolog/log"
import "github.com/sneak/merp/models" import "github.com/sneak/merp/models"
const MAX_MERPS_TO_RETURN = 50
const LONGPOLL_TIMEOUT_SECS = 60
func decodeJSON(in []byte) (interface{}, error) { func decodeJSON(in []byte) (interface{}, error) {
var out interface{} var out interface{}
err := json.Unmarshal(in, &out) err := json.Unmarshal(in, &out)
@ -23,8 +26,33 @@ func decodeJSON(in []byte) (interface{}, error) {
func (ms *Server) listenForMerps() gin.HandlerFunc { func (ms *Server) listenForMerps() gin.HandlerFunc {
// /listen/for/merps/from/my-thing-name // /listen/for/merps/from/my-thing-name
return func(c *gin.Context) { return func(c *gin.Context) {
// step one is make new channel and add listener to ms structure thing := c.Param("thing")
// wait until LONGPOLL_TIMEOUT secs for new merp if thing != "" {
if thingRegex.MatchString(thing) == false {
c.JSON(http.StatusPreconditionFailed, gin.H{
"this": "failed",
"status": http.StatusPreconditionFailed,
"because": "invalid thing format, try [a-zA-Z0-9-_.]",
})
return
}
}
el := NewEventListener(MerpTopic(thing))
timeout := time.After(LONGPOLL_TIMEOUT_SECS * time.Second)
// Keep trying until we're timed out or got a result or got an error
for {
select {
case <-c.Done():
// unregister listener and close conn FIXME
case <-timeout:
// unregister listener and close conn FIXME
case newMerpJSON := <-el.NewMerpJSONChannel:
log.Info().Msg(newMerpJSON)
// send them some json FIXME
}
}
} }
} }
@ -45,9 +73,9 @@ func (ms *Server) getLatestMerps() gin.HandlerFunc {
var qs orm.QuerySeter var qs orm.QuerySeter
if thing == "" { if thing == "" {
qs = ms.db.QueryTable("merp").OrderBy("-created").Limit(50) qs = ms.db.QueryTable("merp").OrderBy("-created").Limit(MAX_MERPS_TO_RETURN)
} else { } else {
qs = ms.db.QueryTable("merp").Filter("thing", thing).OrderBy("-created").Limit(50) qs = ms.db.QueryTable("merp").Filter("thing", thing).OrderBy("-created").Limit(MAX_MERPS_TO_RETURN)
} }
var merps []*models.Merp var merps []*models.Merp
@ -88,11 +116,9 @@ func (ms *Server) handleNewMerp() gin.HandlerFunc {
}) })
return return
} }
//web.Get(`/merp/for/([A-Za-z0-9\-\_\.]+)`, merpHandler)
// FIXME rate limit this a bit on thing+clientip+json to cut down on // FIXME rate limit this a bit on thing+clientip+json to cut down on
// repeated messages // repeated messages
content := make(map[string]interface{}) content := make(map[string]interface{})
respContent := gin.H{} respContent := gin.H{}
// FIXME support POST data as well // FIXME support POST data as well

View File

@ -1,13 +1,16 @@
package merp package merp
import "context"
import "encoding/json" import "encoding/json"
import "fmt" import "fmt"
import "net/http" import "net/http"
import "os" import "os"
import "os/signal"
import "regexp" import "regexp"
import "strconv" import "strconv"
import "time"
import "sync" import "sync"
import "syscall"
import "time"
import "github.com/didip/tollbooth" import "github.com/didip/tollbooth"
import "github.com/didip/tollbooth_gin" import "github.com/didip/tollbooth_gin"
@ -19,7 +22,7 @@ import "github.com/thoas/stats"
import "github.com/astaxie/beego/orm" import "github.com/astaxie/beego/orm"
import _ "github.com/lib/pq" //revive:disable-line import _ "github.com/lib/pq" //revive:disable-line
var thingRegex = regexp.MustCompile(`^[a-zA-Z0-9\_\-]+$`) var thingRegex = regexp.MustCompile(`^[a-zA-Z0-9\_\-\.]+$`)
type MerpTopic string type MerpTopic string
@ -38,8 +41,8 @@ type Server struct {
port uint port uint
server *http.Server server *http.Server
stats *stats.Stats stats *stats.Stats
ll *sync.Mutex // protects listeners below
listeners map[MerpTopic][]*EventListener listeners map[MerpTopic][]*EventListener
ll *sync.Mutex // listeners [map] lock
} }
// NewServer returns a Server, so that you can run the API. // NewServer returns a Server, so that you can run the API.
@ -51,7 +54,33 @@ func NewServer() *Server {
return ms return ms
} }
func (ms *Server) addListener(topic MerpTopic, l *EventListener) { type EventListener struct {
Topic MerpTopic
NewMerpJSONChannel chan string
}
func NewEventListener(t MerpTopic) *EventListener {
el := new(EventListener)
el.init()
el.Topic = t
return el
}
func (el *EventListener) init() {
el.NewMerpJSONChannel = make(chan string)
}
func (ms *Server) AddListener(l *EventListener) {
ms.ll.Lock()
defer ms.ll.Unlock()
if ms.listeners[l.Topic] == nil {
// is this an allocation DoS even with rate-limiting middleware?
ms.listeners[l.Topic] = make([]*EventListener, 0)
}
ms.listeners[l.Topic] = append(ms.listeners[l.Topic], l)
}
func (ms *Server) DelListener(l *EventListener) {
// FIXME(sneak) // FIXME(sneak)
} }
@ -93,10 +122,34 @@ func (ms *Server) connectDB() {
// ServeForever causes merp to serve http forever // ServeForever causes merp to serve http forever
func (ms *Server) ServeForever() { func (ms *Server) ServeForever() {
// start server
go func() {
err := ms.server.ListenAndServe() err := ms.server.ListenAndServe()
if err != nil { if err != nil {
panic(err) panic(err)
} }
}()
// listen for signals
quit := make(chan os.Signal)
// kill -9 is syscall.SIGKILL but can't be caught, so don't need to add it
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
// blocking wait for signal
<-quit
log.Info().Msg("shutting down server")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := ms.server.Shutdown(ctx); err != nil {
log.Error().Err(err)
}
// catching ctx.Done(). timeout of 5 seconds.
select {
case <-ctx.Done():
log.Info().Msg("server shutdown")
}
} }
func (ms *Server) healthCheckHandler() http.HandlerFunc { func (ms *Server) healthCheckHandler() http.HandlerFunc {
@ -167,19 +220,3 @@ func (ms *Server) setupRoutes() {
ms.gin = r ms.gin = r
} }
type EventListener struct {
thing string
notifications chan struct{}
}
func NewEventListener() *EventListener {
el := new(EventListener)
el.init()
return el
}
func (el *EventListener) init() {
el.notifications = make(chan struct{})
// NOOP for now
}