From 3dde1d70191b6ff2c289b4dcb31db8f18de088dd Mon Sep 17 00:00:00 2001 From: Jeffrey Paul Date: Mon, 18 Nov 2019 00:14:05 -0800 Subject: [PATCH] starting to work on notifications --- merp.go | 38 +++++++++++++++++++++---- server.go | 83 ++++++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 92 insertions(+), 29 deletions(-) diff --git a/merp.go b/merp.go index 8e4c267..199f7e0 100644 --- a/merp.go +++ b/merp.go @@ -10,6 +10,9 @@ import "github.com/google/uuid" import "github.com/rs/zerolog/log" import "github.com/sneak/merp/models" +const MAX_MERPS_TO_RETURN = 50 +const LONGPOLL_TIMEOUT_SECS = 60 + func decodeJSON(in []byte) (interface{}, error) { var out interface{} err := json.Unmarshal(in, &out) @@ -23,8 +26,33 @@ func decodeJSON(in []byte) (interface{}, error) { func (ms *Server) listenForMerps() gin.HandlerFunc { // /listen/for/merps/from/my-thing-name return func(c *gin.Context) { - // step one is make new channel and add listener to ms structure - // wait until LONGPOLL_TIMEOUT secs for new merp + thing := c.Param("thing") + if thing != "" { + if thingRegex.MatchString(thing) == false { + c.JSON(http.StatusPreconditionFailed, gin.H{ + "this": "failed", + "status": http.StatusPreconditionFailed, + "because": "invalid thing format, try [a-zA-Z0-9-_.]", + }) + return + } + } + + el := NewEventListener(MerpTopic(thing)) + + timeout := time.After(LONGPOLL_TIMEOUT_SECS * time.Second) + // Keep trying until we're timed out or got a result or got an error + for { + select { + case <-c.Done(): + // unregister listener and close conn FIXME + case <-timeout: + // unregister listener and close conn FIXME + case newMerpJSON := <-el.NewMerpJSONChannel: + log.Info().Msg(newMerpJSON) + // send them some json FIXME + } + } } } @@ -45,9 +73,9 @@ func (ms *Server) getLatestMerps() gin.HandlerFunc { var qs orm.QuerySeter if thing == "" { - qs = ms.db.QueryTable("merp").OrderBy("-created").Limit(50) + qs = ms.db.QueryTable("merp").OrderBy("-created").Limit(MAX_MERPS_TO_RETURN) } else { - qs = ms.db.QueryTable("merp").Filter("thing", thing).OrderBy("-created").Limit(50) + qs = ms.db.QueryTable("merp").Filter("thing", thing).OrderBy("-created").Limit(MAX_MERPS_TO_RETURN) } var merps []*models.Merp @@ -88,11 +116,9 @@ func (ms *Server) handleNewMerp() gin.HandlerFunc { }) return } - //web.Get(`/merp/for/([A-Za-z0-9\-\_\.]+)`, merpHandler) // FIXME rate limit this a bit on thing+clientip+json to cut down on // repeated messages - content := make(map[string]interface{}) respContent := gin.H{} // FIXME support POST data as well diff --git a/server.go b/server.go index 630fd63..5d4a627 100644 --- a/server.go +++ b/server.go @@ -1,13 +1,16 @@ package merp +import "context" import "encoding/json" import "fmt" import "net/http" import "os" +import "os/signal" import "regexp" import "strconv" -import "time" import "sync" +import "syscall" +import "time" import "github.com/didip/tollbooth" import "github.com/didip/tollbooth_gin" @@ -19,7 +22,7 @@ import "github.com/thoas/stats" import "github.com/astaxie/beego/orm" import _ "github.com/lib/pq" //revive:disable-line -var thingRegex = regexp.MustCompile(`^[a-zA-Z0-9\_\-]+$`) +var thingRegex = regexp.MustCompile(`^[a-zA-Z0-9\_\-\.]+$`) type MerpTopic string @@ -38,8 +41,8 @@ type Server struct { port uint server *http.Server stats *stats.Stats + ll *sync.Mutex // protects listeners below listeners map[MerpTopic][]*EventListener - ll *sync.Mutex // listeners [map] lock } // NewServer returns a Server, so that you can run the API. @@ -51,7 +54,33 @@ func NewServer() *Server { return ms } -func (ms *Server) addListener(topic MerpTopic, l *EventListener) { +type EventListener struct { + Topic MerpTopic + NewMerpJSONChannel chan string +} + +func NewEventListener(t MerpTopic) *EventListener { + el := new(EventListener) + el.init() + el.Topic = t + return el +} + +func (el *EventListener) init() { + el.NewMerpJSONChannel = make(chan string) +} + +func (ms *Server) AddListener(l *EventListener) { + ms.ll.Lock() + defer ms.ll.Unlock() + if ms.listeners[l.Topic] == nil { + // is this an allocation DoS even with rate-limiting middleware? + ms.listeners[l.Topic] = make([]*EventListener, 0) + } + ms.listeners[l.Topic] = append(ms.listeners[l.Topic], l) +} + +func (ms *Server) DelListener(l *EventListener) { // FIXME(sneak) } @@ -93,9 +122,33 @@ func (ms *Server) connectDB() { // ServeForever causes merp to serve http forever func (ms *Server) ServeForever() { - err := ms.server.ListenAndServe() - if err != nil { - panic(err) + + // start server + go func() { + err := ms.server.ListenAndServe() + if err != nil { + panic(err) + } + }() + + // listen for signals + quit := make(chan os.Signal) + // kill -9 is syscall.SIGKILL but can't be caught, so don't need to add it + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + + // blocking wait for signal + <-quit + log.Info().Msg("shutting down server") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := ms.server.Shutdown(ctx); err != nil { + log.Error().Err(err) + } + // catching ctx.Done(). timeout of 5 seconds. + select { + case <-ctx.Done(): + log.Info().Msg("server shutdown") } } @@ -167,19 +220,3 @@ func (ms *Server) setupRoutes() { ms.gin = r } - -type EventListener struct { - thing string - notifications chan struct{} -} - -func NewEventListener() *EventListener { - el := new(EventListener) - el.init() - return el -} - -func (el *EventListener) init() { - el.notifications = make(chan struct{}) - // NOOP for now -}