From 11fcf63f91a3c9fc1d38463cfd9666f18a51ded2 Mon Sep 17 00:00:00 2001 From: Evgenii Akentev Date: Thu, 29 Aug 2024 21:13:41 +0400 Subject: [PATCH] basic subscription works --- .gitignore | 2 + client.go | 134 ++++++++++++++++++++++++++++++++++++++++++++++++++++ entities.go | 45 ++++++++++++++++++ go.mod | 5 ++ go.sum | 2 + hub.go | 49 +++++++++++++++++++ main.go | 106 +++++++++++++++++++++++++++++++++++++++++ 7 files changed, 343 insertions(+) create mode 100644 .gitignore create mode 100644 client.go create mode 100644 entities.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 hub.go create mode 100644 main.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..477ea50 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.jj +main diff --git a/client.go b/client.go new file mode 100644 index 0000000..e5ee567 --- /dev/null +++ b/client.go @@ -0,0 +1,134 @@ +package main + +import ( + "bytes" + "log" + "net/http" + "time" + + "github.com/gorilla/websocket" +) + +const ( + // Time allowed to write a message to the peer. + writeWait = 10 * time.Second + + // Time allowed to read the next pong message from the peer. + pongWait = 60 * time.Second + + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + + // Maximum message size allowed from peer. + maxMessageSize = 512 +) + +var ( + newline = []byte{'\n'} + space = []byte{' '} +) + +var upgrader = websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool{ return true }, +} + +// Client is a middleman between the websocket connection and the hub. +type Client struct { + hub *Hub + + // The websocket connection. + conn *websocket.Conn + + // Buffered channel of outbound messages. + send chan []byte +} + +// readPump pumps messages from the websocket connection to the hub. +// +// The application runs readPump in a per-connection goroutine. The application +// ensures that there is at most one reader on a connection by executing all +// reads from this goroutine. +func (c *Client) readPump() { + defer func() { + c.hub.unregister <- c + c.conn.Close() + }() + c.conn.SetReadLimit(maxMessageSize) + c.conn.SetReadDeadline(time.Now().Add(pongWait)) + c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) + for { + _, message, err := c.conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("error: %v", err) + } + break + } + message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) + c.hub.broadcast <- message + } +} + +// writePump pumps messages from the hub to the websocket connection. +// +// A goroutine running writePump is started for each connection. The +// application ensures that there is at most one writer to a connection by +// executing all writes from this goroutine. +func (c *Client) writePump() { + ticker := time.NewTicker(pingPeriod) + defer func() { + ticker.Stop() + c.conn.Close() + }() + for { + select { + case message, ok := <-c.send: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if !ok { + // The hub closed the channel. + c.conn.WriteMessage(websocket.CloseMessage, []byte{}) + return + } + + w, err := c.conn.NextWriter(websocket.TextMessage) + if err != nil { + return + } + w.Write(message) + + // Add queued chat messages to the current websocket message. + n := len(c.send) + for i := 0; i < n; i++ { + w.Write(newline) + w.Write(<-c.send) + } + + if err := w.Close(); err != nil { + return + } + case <-ticker.C: + c.conn.SetWriteDeadline(time.Now().Add(writeWait)) + if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + return + } + } + } +} + +func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)} + client.hub.register <- client + + // Allow collection of memory referenced by the caller by doing all work in + // new goroutines. + go client.writePump() + go client.readPump() +} + diff --git a/entities.go b/entities.go new file mode 100644 index 0000000..23d6599 --- /dev/null +++ b/entities.go @@ -0,0 +1,45 @@ +package main + + +import ( +// "encoding/json" +) + +type Message struct { + Event string `json:"event"` + Addrs []string `json:"addresses"` +} + +type PersonDetails struct { + name string + address string +} + +type DocumentData struct { +} + +type DeclarationStatus string +const ( + Stale DeclarationStatus = "stale" + MoneyFrozen = "money_frozen" + FilesSent = "files_sent" + ViewedByClient = "viewed_by_client" +) + +type ExclusiveRightsClaim struct { +} + +type Copyright struct { + address string + author string + owner string + ownerDetails PersonDetails + royalty *string + + document DocumentData + status DeclarationStatus + balance int + price *int + assignmentHash *string + claim *ExclusiveRightsClaim +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..6ed8fdc --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module main + +go 1.22.5 + +require github.com/gorilla/websocket v1.5.3 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..25a9fc4 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/hub.go b/hub.go new file mode 100644 index 0000000..3efd65a --- /dev/null +++ b/hub.go @@ -0,0 +1,49 @@ +package main + +// Hub maintains the set of active clients and broadcasts messages to the +// clients. +type Hub struct { + // Registered clients. + clients map[*Client]bool + + // Inbound messages from the clients. + broadcast chan []byte + + // Register requests from the clients. + register chan *Client + + // Unregister requests from clients. + unregister chan *Client +} + +func newHub() *Hub { + return &Hub{ + broadcast: make(chan []byte), + register: make(chan *Client), + unregister: make(chan *Client), + clients: make(map[*Client]bool), + } +} + +func (h *Hub) run() { + for { + select { + case client := <-h.register: + h.clients[client] = true + case client := <-h.unregister: + if _, ok := h.clients[client]; ok { + delete(h.clients, client) + close(client.send) + } + case message := <-h.broadcast: + for client := range h.clients { + select { + case client.send <- message: + default: + close(client.send) + delete(h.clients, client) + } + } + } + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..06c8641 --- /dev/null +++ b/main.go @@ -0,0 +1,106 @@ +package main + +import ( + "flag" + "log" + "net/http" + + "github.com/gorilla/websocket" +) + +type Subscribers = map[string]([](*websocket.Conn)) + +type State struct { + copyrights map[string]Copyright + copyrightSubs map[string]([]*websocket.Conn) + walletSubs map[string]([]*websocket.Conn) +} + +func newState() *State { + return &State { + copyrights: make(map[string]Copyright), + copyrightSubs: make(map[string]([]*websocket.Conn)), + walletSubs: make(map[string]([]*websocket.Conn)), + } +} + +func subscribe(m Subscribers, k string, c *websocket.Conn) { + var newSubs [](*websocket.Conn) + if subs, ok := m[k]; ok { + newSubs = append(subs, c) + m[k] = newSubs + } else { + newSubs = make([](*websocket.Conn), 1) + newSubs[0] = c + m[k] = newSubs + } +} + +func unsubscribe(m Subscribers, k string, c *websocket.Conn) { + var newSubs [](*websocket.Conn) + + if subs, ok := m[k]; ok { + for _, sub := range subs { + if (sub != c) { + newSubs = append(newSubs, sub) + } + } + m[k] = newSubs + } +} + +func listenConnection(state *State, c *websocket.Conn) { + defer func() { + c.Close() + }() + + for { + var m Message + err := c.ReadJSON(&m) + + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("error: %v", err) + } + log.Printf("error: %v", err) + break + } + + switch m.Event { + case "subscribe-wallets": + for _, addr := range m.Addrs { + subscribe((*state).walletSubs, addr, c) + } + default: + log.Printf("error: Unsupported event: %v", m) + } + } +} + +func serve(state *State, w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + + go listenConnection(state, conn) +} + +func main() { + flag.Parse() + + state := newState() + + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + serve(state, w, r) + }) + + log.Println("Started websocket server at :3000") + + err := http.ListenAndServe(":3000", nil) + if err != nil { + log.Fatal("ListenAndServe: ", err) + } +} + -- 2.34.1