basic subscription works
authorEvgenii Akentev <i@ak3n.com>
Thu, 29 Aug 2024 17:13:41 +0000 (21:13 +0400)
committerEvgenii Akentev <i@ak3n.com>
Thu, 29 Aug 2024 17:13:41 +0000 (21:13 +0400)
.gitignore [new file with mode: 0644]
client.go [new file with mode: 0644]
entities.go [new file with mode: 0644]
go.mod [new file with mode: 0644]
go.sum [new file with mode: 0644]
hub.go [new file with mode: 0644]
main.go [new file with mode: 0644]

diff --git a/.gitignore b/.gitignore
new file mode 100644 (file)
index 0000000..477ea50
--- /dev/null
@@ -0,0 +1,2 @@
+.jj
+main
diff --git a/client.go b/client.go
new file mode 100644 (file)
index 0000000..e5ee567
--- /dev/null
+++ b/client.go
@@ -0,0 +1,134 @@
+package main
+
+import (
+       "bytes"
+       "log"
+       "net/http"
+       "time"
+
+       "github.com/gorilla/websocket"
+)
+
+const (
+       // Time allowed to write a message to the peer.
+       writeWait = 10 * time.Second
+
+       // Time allowed to read the next pong message from the peer.
+       pongWait = 60 * time.Second
+
+       // Send pings to peer with this period. Must be less than pongWait.
+       pingPeriod = (pongWait * 9) / 10
+
+       // Maximum message size allowed from peer.
+       maxMessageSize = 512
+)
+
+var (
+       newline = []byte{'\n'}
+       space   = []byte{' '}
+)
+
+var upgrader = websocket.Upgrader{
+       ReadBufferSize:  1024,
+       WriteBufferSize: 1024,
+  CheckOrigin: func(r *http.Request) bool{ return true },
+}
+
+// Client is a middleman between the websocket connection and the hub.
+type Client struct {
+       hub *Hub
+
+       // The websocket connection.
+       conn *websocket.Conn
+
+       // Buffered channel of outbound messages.
+       send chan []byte
+}
+
+// readPump pumps messages from the websocket connection to the hub.
+//
+// The application runs readPump in a per-connection goroutine. The application
+// ensures that there is at most one reader on a connection by executing all
+// reads from this goroutine.
+func (c *Client) readPump() {
+       defer func() {
+               c.hub.unregister <- c
+               c.conn.Close()
+       }()
+       c.conn.SetReadLimit(maxMessageSize)
+       c.conn.SetReadDeadline(time.Now().Add(pongWait))
+       c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil })
+       for {
+               _, message, err := c.conn.ReadMessage()
+               if err != nil {
+                       if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
+                               log.Printf("error: %v", err)
+                       }
+                       break
+               }
+               message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
+               c.hub.broadcast <- message
+       }
+}
+
+// writePump pumps messages from the hub to the websocket connection.
+//
+// A goroutine running writePump is started for each connection. The
+// application ensures that there is at most one writer to a connection by
+// executing all writes from this goroutine.
+func (c *Client) writePump() {
+       ticker := time.NewTicker(pingPeriod)
+       defer func() {
+               ticker.Stop()
+               c.conn.Close()
+       }()
+       for {
+               select {
+               case message, ok := <-c.send:
+                       c.conn.SetWriteDeadline(time.Now().Add(writeWait))
+                       if !ok {
+                               // The hub closed the channel.
+                               c.conn.WriteMessage(websocket.CloseMessage, []byte{})
+                               return
+                       }
+
+                       w, err := c.conn.NextWriter(websocket.TextMessage)
+                       if err != nil {
+                               return
+                       }
+                       w.Write(message)
+
+                       // Add queued chat messages to the current websocket message.
+                       n := len(c.send)
+                       for i := 0; i < n; i++ {
+                               w.Write(newline)
+                               w.Write(<-c.send)
+                       }
+
+                       if err := w.Close(); err != nil {
+                               return
+                       }
+               case <-ticker.C:
+                       c.conn.SetWriteDeadline(time.Now().Add(writeWait))
+                       if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
+                               return
+                       }
+               }
+       }
+}
+
+func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
+       conn, err := upgrader.Upgrade(w, r, nil)
+       if err != nil {
+               log.Println(err)
+               return
+       }
+       client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)}
+       client.hub.register <- client
+
+       // Allow collection of memory referenced by the caller by doing all work in
+       // new goroutines.
+       go client.writePump()
+       go client.readPump()
+}
+
diff --git a/entities.go b/entities.go
new file mode 100644 (file)
index 0000000..23d6599
--- /dev/null
@@ -0,0 +1,45 @@
+package main
+
+
+import (
+//  "encoding/json"
+)
+
+type Message struct {
+  Event string `json:"event"`
+  Addrs []string `json:"addresses"`
+}
+
+type PersonDetails struct {
+  name string
+  address string
+}
+
+type DocumentData struct {
+}
+
+type DeclarationStatus string
+const (
+   Stale DeclarationStatus = "stale"
+   MoneyFrozen = "money_frozen"
+   FilesSent = "files_sent"
+   ViewedByClient = "viewed_by_client"
+)
+
+type ExclusiveRightsClaim struct {
+}
+
+type Copyright struct {
+  address string
+  author string
+  owner string
+  ownerDetails PersonDetails
+  royalty *string
+
+  document DocumentData
+  status DeclarationStatus
+  balance int
+  price *int
+  assignmentHash *string
+  claim *ExclusiveRightsClaim
+}
diff --git a/go.mod b/go.mod
new file mode 100644 (file)
index 0000000..6ed8fdc
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,5 @@
+module main
+
+go 1.22.5
+
+require github.com/gorilla/websocket v1.5.3
diff --git a/go.sum b/go.sum
new file mode 100644 (file)
index 0000000..25a9fc4
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,2 @@
+github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
+github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
diff --git a/hub.go b/hub.go
new file mode 100644 (file)
index 0000000..3efd65a
--- /dev/null
+++ b/hub.go
@@ -0,0 +1,49 @@
+package main
+
+// Hub maintains the set of active clients and broadcasts messages to the
+// clients.
+type Hub struct {
+       // Registered clients.
+       clients map[*Client]bool
+
+       // Inbound messages from the clients.
+       broadcast chan []byte
+
+       // Register requests from the clients.
+       register chan *Client
+
+       // Unregister requests from clients.
+       unregister chan *Client
+}
+
+func newHub() *Hub {
+       return &Hub{
+               broadcast:  make(chan []byte),
+               register:   make(chan *Client),
+               unregister: make(chan *Client),
+               clients:    make(map[*Client]bool),
+       }
+}
+
+func (h *Hub) run() {
+       for {
+               select {
+               case client := <-h.register:
+                       h.clients[client] = true
+               case client := <-h.unregister:
+                       if _, ok := h.clients[client]; ok {
+                               delete(h.clients, client)
+                               close(client.send)
+                       }
+               case message := <-h.broadcast:
+                       for client := range h.clients {
+                               select {
+                               case client.send <- message:
+                               default:
+                                       close(client.send)
+                                       delete(h.clients, client)
+                               }
+                       }
+               }
+       }
+}
diff --git a/main.go b/main.go
new file mode 100644 (file)
index 0000000..06c8641
--- /dev/null
+++ b/main.go
@@ -0,0 +1,106 @@
+package main
+
+import (
+       "flag"
+       "log"
+       "net/http"
+
+  "github.com/gorilla/websocket"
+)
+
+type Subscribers = map[string]([](*websocket.Conn))
+
+type State struct {
+  copyrights map[string]Copyright
+  copyrightSubs map[string]([]*websocket.Conn)
+  walletSubs map[string]([]*websocket.Conn)
+}
+
+func newState() *State {
+  return &State {
+    copyrights: make(map[string]Copyright),
+    copyrightSubs: make(map[string]([]*websocket.Conn)),
+    walletSubs: make(map[string]([]*websocket.Conn)),
+  }
+}
+
+func subscribe(m Subscribers, k string, c *websocket.Conn)  {
+  var newSubs [](*websocket.Conn)
+  if subs, ok := m[k]; ok {
+    newSubs = append(subs, c)
+    m[k] = newSubs
+  } else {
+    newSubs = make([](*websocket.Conn), 1)
+    newSubs[0] = c
+    m[k] = newSubs
+  }
+}
+
+func unsubscribe(m Subscribers, k string, c *websocket.Conn)  {
+  var newSubs [](*websocket.Conn)
+
+  if subs, ok := m[k]; ok {
+    for _, sub := range subs {
+      if (sub != c) {
+        newSubs = append(newSubs, sub)
+      }
+    }
+    m[k] = newSubs
+  }
+}
+
+func listenConnection(state *State, c *websocket.Conn) {
+  defer func() {
+    c.Close()
+  }()
+
+  for {
+    var m Message 
+    err := c.ReadJSON(&m)
+
+       if err != nil {
+                       if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
+                               log.Printf("error: %v", err)
+                       }
+      log.Printf("error: %v", err)
+                       break
+               }
+
+    switch m.Event {
+    case "subscribe-wallets":
+      for _, addr := range m.Addrs {
+        subscribe((*state).walletSubs, addr, c)
+      }
+    default:
+      log.Printf("error: Unsupported event: %v", m)
+    }
+  }
+}
+
+func serve(state *State, w http.ResponseWriter, r *http.Request) {
+  conn, err := upgrader.Upgrade(w, r, nil)
+  if err != nil {
+    log.Println(err)
+    return
+  }
+
+  go listenConnection(state, conn)
+}
+
+func main() {
+       flag.Parse()
+
+  state := newState()
+
+  http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
+               serve(state, w, r)
+       })
+  
+  log.Println("Started websocket server at :3000")
+
+  err := http.ListenAndServe(":3000", nil)
+       if err != nil {
+               log.Fatal("ListenAndServe: ", err)
+       }
+}
+