Compare commits

...

2 Commits
master ... next

Author SHA1 Message Date
Jeffrey Paul 1b854bdf02 readme and license updates 2019-11-27 09:22:25 -08:00
Jeffrey Paul 3dde1d7019 starting to work on notifications 2019-11-18 00:14:05 -08:00
4 changed files with 127 additions and 37 deletions

13
LICENSE Normal file
View File

@ -0,0 +1,13 @@
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
Version 2, December 2004
Copyright (C) 2004 Sam Hocevar <sam@hocevar.net>
Everyone is permitted to copy and distribute verbatim or modified
copies of this license document, and changing it is allowed as long
as the name is changed.
DO WHAT THE FUCK YOU WANT TO PUBLIC LICENSE
TERMS AND CONDITIONS FOR COPYING, DISTRIBUTION AND MODIFICATION
0. You just DO WHAT THE FUCK YOU WANT TO.

View File

@ -1,6 +1,12 @@
# merp
```
_____ ___ ___ ___
| | -_| _| . |
|_|_|_|___|_| | _|
|_|
```
clone of a popular iot api to learn more go
Simple clone of a popular iot api (starts with `dw` and ends with `eet.io`)
for me to learn more go
# Features
@ -8,19 +14,27 @@ clone of a popular iot api to learn more go
* get latest merp
* get latest merps
# Internals
* Postgres db storage
# TODO
* merp notifications to long polling clients
* merp notifications via email
* pruning of old merps
* sending of merps using POST instead of querystring GET
* expiration/pruning of old merps
* receipt of merps using POSTs instead of querystring GET
# notes
# Notes
the source code from this repo and all deps are included in the Docker image
The source code from this repo and all deps are included in the Docker image
by the Dockerfile, vendoring them into the build artifact without requiring
that they be copied into the git repo.
# author
# License
sneak [sneak@sneak.berlin](mailto:sneak@sneak.berlin)
[WTFPL](http://www.wtfpl.net/)
# Author
sneak &lt;sneak@sneak.berlin&gt;

38
merp.go
View File

@ -10,6 +10,9 @@ import "github.com/google/uuid"
import "github.com/rs/zerolog/log"
import "github.com/sneak/merp/models"
const MAX_MERPS_TO_RETURN = 50
const LONGPOLL_TIMEOUT_SECS = 60
func decodeJSON(in []byte) (interface{}, error) {
var out interface{}
err := json.Unmarshal(in, &out)
@ -23,8 +26,33 @@ func decodeJSON(in []byte) (interface{}, error) {
func (ms *Server) listenForMerps() gin.HandlerFunc {
// /listen/for/merps/from/my-thing-name
return func(c *gin.Context) {
// step one is make new channel and add listener to ms structure
// wait until LONGPOLL_TIMEOUT secs for new merp
thing := c.Param("thing")
if thing != "" {
if thingRegex.MatchString(thing) == false {
c.JSON(http.StatusPreconditionFailed, gin.H{
"this": "failed",
"status": http.StatusPreconditionFailed,
"because": "invalid thing format, try [a-zA-Z0-9-_.]",
})
return
}
}
el := NewEventListener(MerpTopic(thing))
timeout := time.After(LONGPOLL_TIMEOUT_SECS * time.Second)
// Keep trying until we're timed out or got a result or got an error
for {
select {
case <-c.Done():
// unregister listener and close conn FIXME
case <-timeout:
// unregister listener and close conn FIXME
case newMerpJSON := <-el.NewMerpJSONChannel:
log.Info().Msg(newMerpJSON)
// send them some json FIXME
}
}
}
}
@ -45,9 +73,9 @@ func (ms *Server) getLatestMerps() gin.HandlerFunc {
var qs orm.QuerySeter
if thing == "" {
qs = ms.db.QueryTable("merp").OrderBy("-created").Limit(50)
qs = ms.db.QueryTable("merp").OrderBy("-created").Limit(MAX_MERPS_TO_RETURN)
} else {
qs = ms.db.QueryTable("merp").Filter("thing", thing).OrderBy("-created").Limit(50)
qs = ms.db.QueryTable("merp").Filter("thing", thing).OrderBy("-created").Limit(MAX_MERPS_TO_RETURN)
}
var merps []*models.Merp
@ -88,11 +116,9 @@ func (ms *Server) handleNewMerp() gin.HandlerFunc {
})
return
}
//web.Get(`/merp/for/([A-Za-z0-9\-\_\.]+)`, merpHandler)
// FIXME rate limit this a bit on thing+clientip+json to cut down on
// repeated messages
content := make(map[string]interface{})
respContent := gin.H{}
// FIXME support POST data as well

View File

@ -1,13 +1,16 @@
package merp
import "context"
import "encoding/json"
import "fmt"
import "net/http"
import "os"
import "os/signal"
import "regexp"
import "strconv"
import "time"
import "sync"
import "syscall"
import "time"
import "github.com/didip/tollbooth"
import "github.com/didip/tollbooth_gin"
@ -19,7 +22,7 @@ import "github.com/thoas/stats"
import "github.com/astaxie/beego/orm"
import _ "github.com/lib/pq" //revive:disable-line
var thingRegex = regexp.MustCompile(`^[a-zA-Z0-9\_\-]+$`)
var thingRegex = regexp.MustCompile(`^[a-zA-Z0-9\_\-\.]+$`)
type MerpTopic string
@ -38,8 +41,8 @@ type Server struct {
port uint
server *http.Server
stats *stats.Stats
ll *sync.Mutex // protects listeners below
listeners map[MerpTopic][]*EventListener
ll *sync.Mutex // listeners [map] lock
}
// NewServer returns a Server, so that you can run the API.
@ -51,7 +54,33 @@ func NewServer() *Server {
return ms
}
func (ms *Server) addListener(topic MerpTopic, l *EventListener) {
type EventListener struct {
Topic MerpTopic
NewMerpJSONChannel chan string
}
func NewEventListener(t MerpTopic) *EventListener {
el := new(EventListener)
el.init()
el.Topic = t
return el
}
func (el *EventListener) init() {
el.NewMerpJSONChannel = make(chan string)
}
func (ms *Server) AddListener(l *EventListener) {
ms.ll.Lock()
defer ms.ll.Unlock()
if ms.listeners[l.Topic] == nil {
// is this an allocation DoS even with rate-limiting middleware?
ms.listeners[l.Topic] = make([]*EventListener, 0)
}
ms.listeners[l.Topic] = append(ms.listeners[l.Topic], l)
}
func (ms *Server) DelListener(l *EventListener) {
// FIXME(sneak)
}
@ -93,9 +122,33 @@ func (ms *Server) connectDB() {
// ServeForever causes merp to serve http forever
func (ms *Server) ServeForever() {
err := ms.server.ListenAndServe()
if err != nil {
panic(err)
// start server
go func() {
err := ms.server.ListenAndServe()
if err != nil {
panic(err)
}
}()
// listen for signals
quit := make(chan os.Signal)
// kill -9 is syscall.SIGKILL but can't be caught, so don't need to add it
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
// blocking wait for signal
<-quit
log.Info().Msg("shutting down server")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := ms.server.Shutdown(ctx); err != nil {
log.Error().Err(err)
}
// catching ctx.Done(). timeout of 5 seconds.
select {
case <-ctx.Done():
log.Info().Msg("server shutdown")
}
}
@ -167,19 +220,3 @@ func (ms *Server) setupRoutes() {
ms.gin = r
}
type EventListener struct {
thing string
notifications chan struct{}
}
func NewEventListener() *EventListener {
el := new(EventListener)
el.init()
return el
}
func (el *EventListener) init() {
el.notifications = make(chan struct{})
// NOOP for now
}