Compare commits
2 Commits
Author | SHA1 | Date |
---|---|---|
Jeffrey Paul | 1b854bdf02 | |
Jeffrey Paul | 3dde1d7019 |
|
@ -0,0 +1,13 @@
|
||||||
|
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
|
||||||
|
Version 2, December 2004
|
||||||
|
|
||||||
|
Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
|
||||||
|
|
||||||
|
Everyone is permitted to copy and distribute verbatim or modified
|
||||||
|
copies of this license document, and changing it is allowed as long
|
||||||
|
as the name is changed.
|
||||||
|
|
||||||
|
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
|
||||||
|
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
|
||||||
|
|
||||||
|
0. You just DO WHAT THE FUCK YOU WANT TO.
|
30
README.md
30
README.md
|
@ -1,6 +1,12 @@
|
||||||
# merp
|
```
|
||||||
|
_____ ___ ___ ___
|
||||||
|
| | -_| _| . |
|
||||||
|
|_|_|_|___|_| | _|
|
||||||
|
|_|
|
||||||
|
```
|
||||||
|
|
||||||
clone of a popular iot api to learn more go
|
Simple clone of a popular iot api (starts with `dw` and ends with `eet.io`)
|
||||||
|
for me to learn more go
|
||||||
|
|
||||||
# Features
|
# Features
|
||||||
|
|
||||||
|
@ -8,19 +14,27 @@ clone of a popular iot api to learn more go
|
||||||
* get latest merp
|
* get latest merp
|
||||||
* get latest merps
|
* get latest merps
|
||||||
|
|
||||||
|
# Internals
|
||||||
|
|
||||||
|
* Postgres db storage
|
||||||
|
|
||||||
# TODO
|
# TODO
|
||||||
|
|
||||||
* merp notifications to long polling clients
|
* merp notifications to long polling clients
|
||||||
* merp notifications via email
|
* merp notifications via email
|
||||||
* pruning of old merps
|
* expiration/pruning of old merps
|
||||||
* sending of merps using POST instead of querystring GET
|
* receipt of merps using POSTs instead of querystring GET
|
||||||
|
|
||||||
# notes
|
# Notes
|
||||||
|
|
||||||
the source code from this repo and all deps are included in the Docker image
|
The source code from this repo and all deps are included in the Docker image
|
||||||
by the Dockerfile, vendoring them into the build artifact without requiring
|
by the Dockerfile, vendoring them into the build artifact without requiring
|
||||||
that they be copied into the git repo.
|
that they be copied into the git repo.
|
||||||
|
|
||||||
# author
|
# License
|
||||||
|
|
||||||
sneak [sneak@sneak.berlin](mailto:sneak@sneak.berlin)
|
[WTFPL](http://www.wtfpl.net/)
|
||||||
|
|
||||||
|
# Author
|
||||||
|
|
||||||
|
sneak <sneak@sneak.berlin>
|
||||||
|
|
38
merp.go
38
merp.go
|
@ -10,6 +10,9 @@ import "github.com/google/uuid"
|
||||||
import "github.com/rs/zerolog/log"
|
import "github.com/rs/zerolog/log"
|
||||||
import "github.com/sneak/merp/models"
|
import "github.com/sneak/merp/models"
|
||||||
|
|
||||||
|
const MAX_MERPS_TO_RETURN = 50
|
||||||
|
const LONGPOLL_TIMEOUT_SECS = 60
|
||||||
|
|
||||||
func decodeJSON(in []byte) (interface{}, error) {
|
func decodeJSON(in []byte) (interface{}, error) {
|
||||||
var out interface{}
|
var out interface{}
|
||||||
err := json.Unmarshal(in, &out)
|
err := json.Unmarshal(in, &out)
|
||||||
|
@ -23,8 +26,33 @@ func decodeJSON(in []byte) (interface{}, error) {
|
||||||
func (ms *Server) listenForMerps() gin.HandlerFunc {
|
func (ms *Server) listenForMerps() gin.HandlerFunc {
|
||||||
// /listen/for/merps/from/my-thing-name
|
// /listen/for/merps/from/my-thing-name
|
||||||
return func(c *gin.Context) {
|
return func(c *gin.Context) {
|
||||||
// step one is make new channel and add listener to ms structure
|
thing := c.Param("thing")
|
||||||
// wait until LONGPOLL_TIMEOUT secs for new merp
|
if thing != "" {
|
||||||
|
if thingRegex.MatchString(thing) == false {
|
||||||
|
c.JSON(http.StatusPreconditionFailed, gin.H{
|
||||||
|
"this": "failed",
|
||||||
|
"status": http.StatusPreconditionFailed,
|
||||||
|
"because": "invalid thing format, try [a-zA-Z0-9-_.]",
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
el := NewEventListener(MerpTopic(thing))
|
||||||
|
|
||||||
|
timeout := time.After(LONGPOLL_TIMEOUT_SECS * time.Second)
|
||||||
|
// Keep trying until we're timed out or got a result or got an error
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-c.Done():
|
||||||
|
// unregister listener and close conn FIXME
|
||||||
|
case <-timeout:
|
||||||
|
// unregister listener and close conn FIXME
|
||||||
|
case newMerpJSON := <-el.NewMerpJSONChannel:
|
||||||
|
log.Info().Msg(newMerpJSON)
|
||||||
|
// send them some json FIXME
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,9 +73,9 @@ func (ms *Server) getLatestMerps() gin.HandlerFunc {
|
||||||
|
|
||||||
var qs orm.QuerySeter
|
var qs orm.QuerySeter
|
||||||
if thing == "" {
|
if thing == "" {
|
||||||
qs = ms.db.QueryTable("merp").OrderBy("-created").Limit(50)
|
qs = ms.db.QueryTable("merp").OrderBy("-created").Limit(MAX_MERPS_TO_RETURN)
|
||||||
} else {
|
} else {
|
||||||
qs = ms.db.QueryTable("merp").Filter("thing", thing).OrderBy("-created").Limit(50)
|
qs = ms.db.QueryTable("merp").Filter("thing", thing).OrderBy("-created").Limit(MAX_MERPS_TO_RETURN)
|
||||||
}
|
}
|
||||||
|
|
||||||
var merps []*models.Merp
|
var merps []*models.Merp
|
||||||
|
@ -88,11 +116,9 @@ func (ms *Server) handleNewMerp() gin.HandlerFunc {
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
//web.Get(`/merp/for/([A-Za-z0-9\-\_\.]+)`, merpHandler)
|
|
||||||
|
|
||||||
// FIXME rate limit this a bit on thing+clientip+json to cut down on
|
// FIXME rate limit this a bit on thing+clientip+json to cut down on
|
||||||
// repeated messages
|
// repeated messages
|
||||||
|
|
||||||
content := make(map[string]interface{})
|
content := make(map[string]interface{})
|
||||||
respContent := gin.H{}
|
respContent := gin.H{}
|
||||||
// FIXME support POST data as well
|
// FIXME support POST data as well
|
||||||
|
|
83
server.go
83
server.go
|
@ -1,13 +1,16 @@
|
||||||
package merp
|
package merp
|
||||||
|
|
||||||
|
import "context"
|
||||||
import "encoding/json"
|
import "encoding/json"
|
||||||
import "fmt"
|
import "fmt"
|
||||||
import "net/http"
|
import "net/http"
|
||||||
import "os"
|
import "os"
|
||||||
|
import "os/signal"
|
||||||
import "regexp"
|
import "regexp"
|
||||||
import "strconv"
|
import "strconv"
|
||||||
import "time"
|
|
||||||
import "sync"
|
import "sync"
|
||||||
|
import "syscall"
|
||||||
|
import "time"
|
||||||
|
|
||||||
import "github.com/didip/tollbooth"
|
import "github.com/didip/tollbooth"
|
||||||
import "github.com/didip/tollbooth_gin"
|
import "github.com/didip/tollbooth_gin"
|
||||||
|
@ -19,7 +22,7 @@ import "github.com/thoas/stats"
|
||||||
import "github.com/astaxie/beego/orm"
|
import "github.com/astaxie/beego/orm"
|
||||||
import _ "github.com/lib/pq" //revive:disable-line
|
import _ "github.com/lib/pq" //revive:disable-line
|
||||||
|
|
||||||
var thingRegex = regexp.MustCompile(`^[a-zA-Z0-9\_\-]+$`)
|
var thingRegex = regexp.MustCompile(`^[a-zA-Z0-9\_\-\.]+$`)
|
||||||
|
|
||||||
type MerpTopic string
|
type MerpTopic string
|
||||||
|
|
||||||
|
@ -38,8 +41,8 @@ type Server struct {
|
||||||
port uint
|
port uint
|
||||||
server *http.Server
|
server *http.Server
|
||||||
stats *stats.Stats
|
stats *stats.Stats
|
||||||
|
ll *sync.Mutex // protects listeners below
|
||||||
listeners map[MerpTopic][]*EventListener
|
listeners map[MerpTopic][]*EventListener
|
||||||
ll *sync.Mutex // listeners [map] lock
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer returns a Server, so that you can run the API.
|
// NewServer returns a Server, so that you can run the API.
|
||||||
|
@ -51,7 +54,33 @@ func NewServer() *Server {
|
||||||
return ms
|
return ms
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ms *Server) addListener(topic MerpTopic, l *EventListener) {
|
type EventListener struct {
|
||||||
|
Topic MerpTopic
|
||||||
|
NewMerpJSONChannel chan string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewEventListener(t MerpTopic) *EventListener {
|
||||||
|
el := new(EventListener)
|
||||||
|
el.init()
|
||||||
|
el.Topic = t
|
||||||
|
return el
|
||||||
|
}
|
||||||
|
|
||||||
|
func (el *EventListener) init() {
|
||||||
|
el.NewMerpJSONChannel = make(chan string)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ms *Server) AddListener(l *EventListener) {
|
||||||
|
ms.ll.Lock()
|
||||||
|
defer ms.ll.Unlock()
|
||||||
|
if ms.listeners[l.Topic] == nil {
|
||||||
|
// is this an allocation DoS even with rate-limiting middleware?
|
||||||
|
ms.listeners[l.Topic] = make([]*EventListener, 0)
|
||||||
|
}
|
||||||
|
ms.listeners[l.Topic] = append(ms.listeners[l.Topic], l)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ms *Server) DelListener(l *EventListener) {
|
||||||
// FIXME(sneak)
|
// FIXME(sneak)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,9 +122,33 @@ func (ms *Server) connectDB() {
|
||||||
|
|
||||||
// ServeForever causes merp to serve http forever
|
// ServeForever causes merp to serve http forever
|
||||||
func (ms *Server) ServeForever() {
|
func (ms *Server) ServeForever() {
|
||||||
err := ms.server.ListenAndServe()
|
|
||||||
if err != nil {
|
// start server
|
||||||
panic(err)
|
go func() {
|
||||||
|
err := ms.server.ListenAndServe()
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// listen for signals
|
||||||
|
quit := make(chan os.Signal)
|
||||||
|
// kill -9 is syscall.SIGKILL but can't be caught, so don't need to add it
|
||||||
|
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
|
||||||
|
|
||||||
|
// blocking wait for signal
|
||||||
|
<-quit
|
||||||
|
log.Info().Msg("shutting down server")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if err := ms.server.Shutdown(ctx); err != nil {
|
||||||
|
log.Error().Err(err)
|
||||||
|
}
|
||||||
|
// catching ctx.Done(). timeout of 5 seconds.
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Info().Msg("server shutdown")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -167,19 +220,3 @@ func (ms *Server) setupRoutes() {
|
||||||
|
|
||||||
ms.gin = r
|
ms.gin = r
|
||||||
}
|
}
|
||||||
|
|
||||||
type EventListener struct {
|
|
||||||
thing string
|
|
||||||
notifications chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewEventListener() *EventListener {
|
|
||||||
el := new(EventListener)
|
|
||||||
el.init()
|
|
||||||
return el
|
|
||||||
}
|
|
||||||
|
|
||||||
func (el *EventListener) init() {
|
|
||||||
el.notifications = make(chan struct{})
|
|
||||||
// NOOP for now
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue