From: Evgenii Akentev Date: Mon, 2 Sep 2024 09:23:39 +0000 (+0400) Subject: Add example of the chain listener X-Git-Url: https://git.ak3n.com/?a=commitdiff_plain;h=refs%2Fheads%2Fmaster;p=hub.go.git Add example of the chain listener --- diff --git a/chain-listener.go b/chain-listener.go new file mode 100644 index 0000000..b702d26 --- /dev/null +++ b/chain-listener.go @@ -0,0 +1,52 @@ +package main + +import ( + "context" + "log" + + "github.com/xssnick/tonutils-go/liteclient" + "github.com/xssnick/tonutils-go/ton" + + "main/contracts" +) + +func listenChain() { + log.Println("Start liteclient") + client := liteclient.NewConnectionPool() + + var liteServers string + switch contracts.GetContractEnvironment() { + case "MAINNET": + liteServers = "https://ton.org/global.config.json" + default: + liteServers = "https://ton.org/testnet-global.config.json" + } + + // connect to mainnet lite servers + err := client.AddConnectionsFromConfigUrl(context.Background(), liteServers) + if err != nil { + log.Fatalln("connection err: ", err.Error()) + return + } + + api := ton.NewAPIClient(client, ton.ProofCheckPolicyFast).WithRetry() + ctx := client.StickyContext(context.Background()) + + for { + log.Println("get masterchain info") + b, err := api.CurrentMasterchainInfo(ctx) + if err != nil { + log.Fatalln("get block err:", err.Error()) + return + } + + log.Println("read declarations") + declarations := readDeclarations(api, ctx, b, nil) + + for _, decl := range declarations { + log.Println("%V", decl) + } + + + } +} diff --git a/client.go b/client.go index e5ee567..d5d8881 100644 --- a/client.go +++ b/client.go @@ -1,134 +1,91 @@ package main import ( - "bytes" - "log" "net/http" - "time" - + "log" "github.com/gorilla/websocket" ) -const ( - // Time allowed to write a message to the peer. - writeWait = 10 * time.Second - - // Time allowed to read the next pong message from the peer. - pongWait = 60 * time.Second - - // Send pings to peer with this period. Must be less than pongWait. - pingPeriod = (pongWait * 9) / 10 - - // Maximum message size allowed from peer. - maxMessageSize = 512 -) - -var ( - newline = []byte{'\n'} - space = []byte{' '} -) - var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool{ return true }, } -// Client is a middleman between the websocket connection and the hub. type Client struct { hub *Hub - - // The websocket connection. conn *websocket.Conn - - // Buffered channel of outbound messages. - send chan []byte + hubResponse chan HubResponse } -// readPump pumps messages from the websocket connection to the hub. -// -// The application runs readPump in a per-connection goroutine. The application -// ensures that there is at most one reader on a connection by executing all -// reads from this goroutine. -func (c *Client) readPump() { - defer func() { - c.hub.unregister <- c - c.conn.Close() - }() - c.conn.SetReadLimit(maxMessageSize) - c.conn.SetReadDeadline(time.Now().Add(pongWait)) - c.conn.SetPongHandler(func(string) error { c.conn.SetReadDeadline(time.Now().Add(pongWait)); return nil }) - for { - _, message, err := c.conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - log.Printf("error: %v", err) - } - break - } - message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1)) - c.hub.broadcast <- message - } + +func (c *Client) run() { + go c.handleChannels() + go c.listenConnection() } -// writePump pumps messages from the hub to the websocket connection. -// -// A goroutine running writePump is started for each connection. The -// application ensures that there is at most one writer to a connection by -// executing all writes from this goroutine. -func (c *Client) writePump() { - ticker := time.NewTicker(pingPeriod) - defer func() { - ticker.Stop() - c.conn.Close() - }() - for { - select { - case message, ok := <-c.send: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) - if !ok { - // The hub closed the channel. - c.conn.WriteMessage(websocket.CloseMessage, []byte{}) - return - } +func (c *Client) handleChannels() { + defer func() { + close(c.hubResponse) + }() + + for { + select { + case response := <- c.hubResponse: + switch r := response.(type) { + case CopyrightsHubResponse: + c.conn.WriteJSON(CopyrightsResponse{"copyrights", r}) + case CopyrightStatesHubResponse: + c.conn.WriteJSON(CopyrightStatesResponse{"new-states", r}) + } + } + } +} - w, err := c.conn.NextWriter(websocket.TextMessage) - if err != nil { - return - } - w.Write(message) +func (c *Client) listenConnection() { + defer func() { + c.hub.unregister <- c + c.conn.Close() + close(c.hubResponse) + }() - // Add queued chat messages to the current websocket message. - n := len(c.send) - for i := 0; i < n; i++ { - w.Write(newline) - w.Write(<-c.send) - } + for { + var m Message + err := c.conn.ReadJSON(&m) - if err := w.Close(); err != nil { - return - } - case <-ticker.C: - c.conn.SetWriteDeadline(time.Now().Add(writeWait)) - if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil { - return + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { + log.Printf("error: %v", err) } + log.Printf("error: %v", err) + break } - } -} - -func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Println(err) - return - } - client := &Client{hub: hub, conn: conn, send: make(chan []byte, 256)} - client.hub.register <- client - // Allow collection of memory referenced by the caller by doing all work in - // new goroutines. - go client.writePump() - go client.readPump() + switch m.Event { + case "subscribe-copyrights": + for _, addr := range m.Addrs { + c.hub.subscribe <- SubMessage{Contract, addr, c} + } + case "unsubscribe-copyrights": + for _, addr := range m.Addrs { + c.hub.unsubscribe <- SubMessage{Contract, addr, c} + } + case "subscribe-wallets": + for _, addr := range m.Addrs { + c.hub.subscribe <- SubMessage{Wallet, addr, c} + } + case "unsubscribe-wallets": + for _, addr := range m.Addrs { + c.hub.unsubscribe <- SubMessage{Wallet, addr, c} + } + case "get-copyrights": + c.hub.clientQuery <- ClientQuery{c, GetCopyrightsQuery(m.Addrs)} + case "get-copyrights-by-wallets": + c.hub.clientQuery <- ClientQuery{c, GetCopyrightsByWalletsQuery(m.Addrs)} + case "get-states": + c.hub.clientQuery <- ClientQuery{c, GetCopyrightStatesQuery(m.Addrs)} + default: + log.Printf("error: Unsupported event: %v", m) + } + } } - diff --git a/entities.go b/contract-entities.go similarity index 50% rename from entities.go rename to contract-entities.go index 23d6599..ed1a08f 100644 --- a/entities.go +++ b/contract-entities.go @@ -1,15 +1,5 @@ package main - -import ( -// "encoding/json" -) - -type Message struct { - Event string `json:"event"` - Addrs []string `json:"addresses"` -} - type PersonDetails struct { name string address string @@ -27,12 +17,13 @@ const ( ) type ExclusiveRightsClaim struct { + author Address } type Copyright struct { - address string - author string - owner string + address Address + author Address + owner Address ownerDetails PersonDetails royalty *string @@ -42,4 +33,28 @@ type Copyright struct { price *int assignmentHash *string claim *ExclusiveRightsClaim + + initTxLt int +} + +type CopyrightState struct { + owner Address + ownerDetails PersonDetails + claim *ExclusiveRightsClaim + assignmentHash *string + price *int + balance int + status DeclarationStatus +} + +func (c Copyright) mkState() *CopyrightState { + return &CopyrightState{ + owner: c.owner, + ownerDetails: c.ownerDetails, + claim: c.claim, + assignmentHash: c.assignmentHash, + price: c.price, + balance: c.balance, + status: c.status, + } } diff --git a/declarations.go b/declarations.go new file mode 100644 index 0000000..c23ffe7 --- /dev/null +++ b/declarations.go @@ -0,0 +1,63 @@ +package main + +import ( + "log" + "context" + + "github.com/xssnick/tonutils-go/ton" + + "main/contracts" +) + +type DeclareDocument struct { +} + +type DocumentDeclaration struct { + authorAddress Address + declarationAddress Address + txLt int + txHash string + declaration DeclareDocument +} + +func readDeclarations(client ton.APIClientWrapped, ctx context.Context, b *ton.BlockIDExt, prevLt *uint64) []DocumentDeclaration { + res, err := client.WaitForBlock(b.SeqNo).GetAccount(ctx, b, contracts.GetContractAddress()) + if err != nil { + log.Fatalln("get account err:", err.Error()) + return nil + } + + var declarations []DocumentDeclaration + + var lastLt = res.LastTxLT + var lastHash = res.LastTxHash + + for { + if lastLt == 0 { break } + + list, err := client.ListTransactions(ctx, contracts.GetContractAddress(), 15, lastLt, lastHash) + if err != nil { + log.Printf("send err: %s", err.Error()) + return nil + } + + for _, tx := range list { + + if prevLt != nil && tx.LT <= *prevLt { + return declarations + } + +// var declaration DocumentDeclaration + + outMessage := tx.IO.Out.List + log.Println("%V", outMessage) + +// append(declarations, declaration) + } + + lastLt = list[0].PrevTxLT + lastHash = list[0].PrevTxHash + } + + return declarations +} diff --git a/go.mod b/go.mod index 6ed8fdc..c8ff96b 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,14 @@ module main go 1.22.5 -require github.com/gorilla/websocket v1.5.3 +require ( + github.com/gorilla/websocket v1.5.3 + github.com/xssnick/tonutils-go v1.9.9 +) + +require ( + github.com/oasisprotocol/curve25519-voi v0.0.0-20220328075252-7dd334e3daae // indirect + github.com/sigurn/crc16 v0.0.0-20211026045750-20ab5afb07e3 // indirect + golang.org/x/crypto v0.17.0 // indirect + golang.org/x/sys v0.15.0 // indirect +) diff --git a/go.sum b/go.sum index 25a9fc4..cdd8d11 100644 --- a/go.sum +++ b/go.sum @@ -1,2 +1,12 @@ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/oasisprotocol/curve25519-voi v0.0.0-20220328075252-7dd334e3daae h1:7smdlrfdcZic4VfsGKD2ulWL804a4GVphr4s7WZxGiY= +github.com/oasisprotocol/curve25519-voi v0.0.0-20220328075252-7dd334e3daae/go.mod h1:hVoHR2EVESiICEMbg137etN/Lx+lSrHPTD39Z/uE+2s= +github.com/sigurn/crc16 v0.0.0-20211026045750-20ab5afb07e3 h1:aQKxg3+2p+IFXXg97McgDGT5zcMrQoi0EICZs8Pgchs= +github.com/sigurn/crc16 v0.0.0-20211026045750-20ab5afb07e3/go.mod h1:9/etS5gpQq9BJsJMWg1wpLbfuSnkm8dPF6FdW2JXVhA= +github.com/xssnick/tonutils-go v1.9.9 h1:J0hVJI4LNEFHqgRHzpWTjFuv/Ga89OqLRUc9gxmjCoc= +github.com/xssnick/tonutils-go v1.9.9/go.mod h1:p1l1Bxdv9sz6x2jfbuGQUGJn6g5cqg7xsTp8rBHFoJY= +golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= +golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/hub.go b/hub.go index 3efd65a..ece31fa 100644 --- a/hub.go +++ b/hub.go @@ -1,48 +1,83 @@ package main -// Hub maintains the set of active clients and broadcasts messages to the -// clients. +import ( + "slices" + "cmp" + "log" +) + type Hub struct { - // Registered clients. clients map[*Client]bool + + copyrights map[Address]*Copyright + clientQuery chan ClientQuery - // Inbound messages from the clients. - broadcast chan []byte + subs *Subscriptions + subscribe chan SubMessage + unsubscribe chan SubMessage - // Register requests from the clients. + messages chan string register chan *Client - - // Unregister requests from clients. unregister chan *Client } func newHub() *Hub { return &Hub{ - broadcast: make(chan []byte), + clients: make(map[*Client]bool), + copyrights: make(map[Address]*Copyright), + clientQuery: make(chan ClientQuery), + + subs: newSubscriptions(), + subscribe: make(chan SubMessage), + unsubscribe: make(chan SubMessage), + register: make(chan *Client), - unregister: make(chan *Client), - clients: make(map[*Client]bool), + unregister: make(chan *Client), } } func (h *Hub) run() { for { select { + case cq := <-h.clientQuery: + switch q := cq.query.(type) { + case GetCopyrightsQuery: + var copyrights []*Copyright + for _, addr := range q { + if c, ok := h.copyrights[addr]; ok { + copyrights = append(copyrights, c) + } + } + cq.client.hubResponse <- CopyrightsHubResponse(copyrights) + case GetCopyrightsByWalletsQuery: + var copyrights []*Copyright + for _, cp := range h.copyrights { + if (slices.Contains(q, cp.author) || slices.Contains(q, cp.owner) || slices.Contains(q, cp.claim.author)) { + copyrights = append(copyrights, cp) + } + } + slices.SortFunc(copyrights, func(a, b *Copyright) int { return cmp.Compare(a.initTxLt, b.initTxLt)}) + cq.client.hubResponse <- CopyrightsHubResponse(copyrights) + case GetCopyrightStatesQuery: + var copyrightStates = make(map[Address]*CopyrightState) + for _, addr := range q { + if cp, ok := h.copyrights[addr]; ok { + copyrightStates[addr] = cp.mkState() + } + } + cq.client.hubResponse <- CopyrightStatesHubResponse(copyrightStates) + } + case s := <-h.subscribe: + log.Println("%V", h.subs) + h.subs.subscribe(s.addressType, s.address, s.client) + log.Println("%V", h.subs) + case s := <-h.unsubscribe: + h.subs.unsubscribe(s.addressType, s.address, s.client) case client := <-h.register: h.clients[client] = true case client := <-h.unregister: if _, ok := h.clients[client]; ok { delete(h.clients, client) - close(client.send) - } - case message := <-h.broadcast: - for client := range h.clients { - select { - case client.send <- message: - default: - close(client.send) - delete(h.clients, client) - } } } } diff --git a/main.go b/main.go index 06c8641..0cd5b15 100644 --- a/main.go +++ b/main.go @@ -4,96 +4,20 @@ import ( "flag" "log" "net/http" - - "github.com/gorilla/websocket" + ) -type Subscribers = map[string]([](*websocket.Conn)) - -type State struct { - copyrights map[string]Copyright - copyrightSubs map[string]([]*websocket.Conn) - walletSubs map[string]([]*websocket.Conn) -} - -func newState() *State { - return &State { - copyrights: make(map[string]Copyright), - copyrightSubs: make(map[string]([]*websocket.Conn)), - walletSubs: make(map[string]([]*websocket.Conn)), - } -} - -func subscribe(m Subscribers, k string, c *websocket.Conn) { - var newSubs [](*websocket.Conn) - if subs, ok := m[k]; ok { - newSubs = append(subs, c) - m[k] = newSubs - } else { - newSubs = make([](*websocket.Conn), 1) - newSubs[0] = c - m[k] = newSubs - } -} - -func unsubscribe(m Subscribers, k string, c *websocket.Conn) { - var newSubs [](*websocket.Conn) - - if subs, ok := m[k]; ok { - for _, sub := range subs { - if (sub != c) { - newSubs = append(newSubs, sub) - } - } - m[k] = newSubs - } -} - -func listenConnection(state *State, c *websocket.Conn) { - defer func() { - c.Close() - }() - - for { - var m Message - err := c.ReadJSON(&m) - - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - log.Printf("error: %v", err) - } - log.Printf("error: %v", err) - break - } - - switch m.Event { - case "subscribe-wallets": - for _, addr := range m.Addrs { - subscribe((*state).walletSubs, addr, c) - } - default: - log.Printf("error: Unsupported event: %v", m) - } - } -} - -func serve(state *State, w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - log.Println(err) - return - } - - go listenConnection(state, conn) -} - func main() { flag.Parse() + + hub := newHub() + + go hub.run() - state := newState() + go listenChain() http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - serve(state, w, r) + serve(hub, w, r) }) log.Println("Started websocket server at :3000") @@ -104,3 +28,16 @@ func main() { } } +func serve(hub *Hub, w http.ResponseWriter, r *http.Request) { + conn, err := upgrader.Upgrade(w, r, nil) + if err != nil { + log.Println(err) + return + } + + client := Client{hub: hub, conn: conn} + + hub.register <- &client + + go client.run() +} diff --git a/server-entities.go b/server-entities.go new file mode 100644 index 0000000..fedb571 --- /dev/null +++ b/server-entities.go @@ -0,0 +1,63 @@ +package main + +import ( +// "encoding/json" +) + +type Address string +type AddressType int + +const ( + Wallet = iota + Contract +) + +type SubMessage struct { + addressType AddressType + address Address + client *Client +} + +type Query interface { isQuery() } + +type GetCopyrightsQuery []Address +func (gc GetCopyrightsQuery) isQuery() {} + +type GetCopyrightsByWalletsQuery []Address +func (gc GetCopyrightsByWalletsQuery) isQuery() {} + +type GetCopyrightStatesQuery []Address +func (gc GetCopyrightStatesQuery) isQuery() {} + +type ClientQuery struct { + client *Client + query Query +} + +type HubResponse interface { isHubResponse() } + +type CopyrightsHubResponse []*Copyright +func (cr CopyrightsHubResponse) isHubResponse() {} + +type CopyrightStatesHubResponse map[Address]*CopyrightState +func (cr CopyrightStatesHubResponse) isHubResponse() {} + +type Message struct { + Event string `json:"event"` + Addrs []Address `json:"addresses"` +} + +type CopyrightsResponse struct { + Event string `json:"event"` + Data any `json:"data"` +} + +type CopyrightStatesResponse struct { + Event string `json:"event"` + States map[Address]*CopyrightState `json:"states"` +} + +type Response struct { + copyrightsResponse *CopyrightsResponse + copyrightStatesResponse *CopyrightStatesResponse +} diff --git a/subscriptions.go b/subscriptions.go new file mode 100644 index 0000000..03993f8 --- /dev/null +++ b/subscriptions.go @@ -0,0 +1,51 @@ +package main + +type Subscription map[Address](map[*Client]struct{}) + +type Subscriptions struct { + wallets Subscription + copyrights Subscription +} + +func newSubscriptions() *Subscriptions { + return &Subscriptions{ + wallets: make(map[Address](map[*Client]struct{})), + copyrights: make(map[Address](map[*Client]struct{})), + } +} + +func (s *Subscriptions) removeClient(c *Client) { + for _, v := range s.wallets { + delete(v, c) + } +} + +func (s *Subscriptions) getSub(at AddressType) Subscription { + switch at { + case Wallet: + return s.wallets + case Contract: + return s.copyrights + } + return nil +} + +func (s *Subscriptions) subscribe(at AddressType, addr Address, c *Client) { + var m = s.getSub(at) + if subs, ok := m[addr]; ok { + subs[c] = struct{}{} + //TODO: not sure if it works, maybe need m[addr] ... + } else { + newsubs := make(map[*Client]struct{}) + newsubs[c] = struct{}{} + m[addr] = newsubs + } +} + +func (s *Subscriptions) unsubscribe(at AddressType, addr Address, c *Client) { + var m = s.getSub(at) + if subs, ok := m[addr]; ok { + // TODO: not sure as well + delete(subs, c) + } +}