From a80134fa29fb5165656f2bdc6f69bc7a41076cd0 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Tue, 3 Sep 2019 12:00:47 +0000 Subject: [PATCH] multi: make rebroadcast winners & missed ws only. --- rpcserver.go | 63 ----------------------------- rpcserverhelp.go | 6 +-- rpcwebsocket.go | 103 +++++++++++++++++++++++++++++++++++++---------- 3 files changed, 84 insertions(+), 88 deletions(-) diff --git a/rpcserver.go b/rpcserver.go index 496435fa..3bfd155e 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -226,8 +226,6 @@ var rpcHandlersBeforeInit = map[types.Method]commandHandler{ "missedtickets": handleMissedTickets, "node": handleNode, "ping": handlePing, - "rebroadcastmissed": handleRebroadcastMissed, - "rebroadcastwinners": handleRebroadcastWinners, "searchrawtransactions": handleSearchRawTransactions, "sendrawtransaction": handleSendRawTransaction, "setgenerate": handleSetGenerate, @@ -4496,67 +4494,6 @@ func handlePing(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter return nil, nil } -// handleRebroadcastMissed implements the rebroadcastmissed command. -// -// TODO make this a websocket only command. -func handleRebroadcastMissed(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - best := s.server.chain.BestSnapshot() - mt, err := s.server.chain.MissedTickets() - if err != nil { - return nil, rpcInternalError("Could not get missed tickets "+ - err.Error(), "") - } - - stakeDiff, err := s.server.blockManager.CalcNextRequiredStakeDifficulty() - if err != nil { - return nil, rpcInternalError("Could not calculate next stake "+ - "difficulty "+err.Error(), "") - } - - missedTicketsNtfn := &blockchain.TicketNotificationsData{ - Hash: best.Hash, - Height: best.Height, - StakeDifficulty: stakeDiff, - TicketsSpent: []chainhash.Hash{}, - TicketsMissed: mt, - TicketsNew: []chainhash.Hash{}, - } - - s.ntfnMgr.NotifySpentAndMissedTickets(missedTicketsNtfn) - - return nil, nil -} - -// handleRebroadcastWinners implements the rebroadcastwinners command. -// -// TODO make this a websocket only command. -func handleRebroadcastWinners(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) { - bestHeight := s.server.chain.BestSnapshot().Height - blocks, err := s.server.blockManager.TipGeneration() - if err != nil { - return nil, rpcInternalError("Could not get generation "+ - err.Error(), "") - } - - for i := range blocks { - winningTickets, _, _, err := - s.server.chain.LotteryDataForBlock(&blocks[i]) - if err != nil { - return nil, rpcInternalError("Lottery data for block "+ - "failed: "+err.Error(), "") - } - ntfnData := &WinningTicketsNtfnData{ - BlockHash: blocks[i], - BlockHeight: bestHeight, - Tickets: winningTickets, - } - - s.ntfnMgr.NotifyWinningTickets(ntfnData) - } - - return nil, nil -} - // retrievedTx represents a transaction that was either loaded from the // transaction memory pool or from the database. When a transaction is loaded // from the database, it is loaded with the raw serialized bytes while the diff --git a/rpcserverhelp.go b/rpcserverhelp.go index 5d52cba3..7cff284f 100644 --- a/rpcserverhelp.go +++ b/rpcserverhelp.go @@ -1030,8 +1030,6 @@ var rpcResultTypes = map[types.Method][]interface{}{ "missedtickets": {(*types.MissedTicketsResult)(nil)}, "node": nil, "ping": nil, - "rebroadcastmissed": nil, - "rebroadcastwinners": nil, "searchrawtransactions": {(*string)(nil), (*[]types.SearchRawTransactionsResult)(nil)}, "sendrawtransaction": {(*string)(nil)}, "setgenerate": nil, @@ -1048,7 +1046,6 @@ var rpcResultTypes = map[types.Method][]interface{}{ // Websocket commands. "loadtxfilter": nil, - "session": {(*types.SessionResult)(nil)}, "notifywinningtickets": nil, "notifyspentandmissedtickets": nil, "notifynewtickets": nil, @@ -1057,7 +1054,10 @@ var rpcResultTypes = map[types.Method][]interface{}{ "notifynewtransactions": nil, "notifyreceived": nil, "notifyspent": nil, + "rebroadcastmissed": nil, + "rebroadcastwinners": nil, "rescan": nil, + "session": {(*types.SessionResult)(nil)}, "stopnotifyblocks": nil, "stopnotifynewtransactions": nil, "stopnotifyreceived": nil, diff --git a/rpcwebsocket.go b/rpcwebsocket.go index e70419f1..a2775dd5 100644 --- a/rpcwebsocket.go +++ b/rpcwebsocket.go @@ -64,6 +64,7 @@ type wsCommandHandler func(*wsClient, interface{}) (interface{}, error) // causes a dependency loop. var wsHandlers map[types.Method]wsCommandHandler var wsHandlersBeforeInit = map[types.Method]wsCommandHandler{ + "help": handleWebsocketHelp, "loadtxfilter": handleLoadTxFilter, "notifyblocks": handleNotifyBlocks, "notifywinningtickets": handleWinningTickets, @@ -71,9 +72,10 @@ var wsHandlersBeforeInit = map[types.Method]wsCommandHandler{ "notifynewtickets": handleNewTickets, "notifystakedifficulty": handleStakeDifficulty, "notifynewtransactions": handleNotifyNewTransactions, - "session": handleSession, - "help": handleWebsocketHelp, + "rebroadcastmissed": handleRebroadcastMissed, + "rebroadcastwinners": handleRebroadcastWinners, "rescan": handleRescan, + "session": handleSession, "stopnotifyblocks": handleStopNotifyBlocks, "stopnotifynewtransactions": handleStopNotifyNewTransactions, } @@ -1173,7 +1175,7 @@ type wsClient struct { sync.Mutex // server is the RPC server that is servicing the client. - server *rpcServer + rpcServer *rpcServer // conn is the underlying websocket connection. conn *websocket.Conn @@ -1326,8 +1328,8 @@ out: login := authCmd.Username + ":" + authCmd.Passphrase auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) authSha := sha256.Sum256([]byte(auth)) - cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:]) - limitcmp := subtle.ConstantTimeCompare(authSha[:], c.server.limitauthsha[:]) + cmp := subtle.ConstantTimeCompare(authSha[:], c.rpcServer.authsha[:]) + limitcmp := subtle.ConstantTimeCompare(authSha[:], c.rpcServer.limitauthsha[:]) if cmp != 1 && limitcmp != 1 { rpcsLog.Warnf("Auth failure.") break out @@ -1563,8 +1565,8 @@ out: login := authCmd.Username + ":" + authCmd.Passphrase auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) authSha := sha256.Sum256([]byte(auth)) - cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:]) - limitcmp := subtle.ConstantTimeCompare(authSha[:], c.server.limitauthsha[:]) + cmp := subtle.ConstantTimeCompare(authSha[:], c.rpcServer.authsha[:]) + limitcmp := subtle.ConstantTimeCompare(authSha[:], c.rpcServer.limitauthsha[:]) if cmp != 1 && limitcmp != 1 { rpcsLog.Warnf("Auth failure.") break out @@ -1617,7 +1619,7 @@ out: if ok { resp, err = wsHandler(c, cmd.params) } else { - resp, err = c.server.standardCmdResult(cmd, nil) + resp, err = c.rpcServer.standardCmdResult(cmd, nil) } // Marshal request output. @@ -1688,7 +1690,7 @@ func (c *wsClient) serviceRequest(r *parsedRPCCmd) { if ok { result, err = wsHandler(c, r.params) } else { - result, err = c.server.standardCmdResult(r, nil) + result, err = c.rpcServer.standardCmdResult(r, nil) } reply, err := createMarshalledReply(r.jsonrpc, r.id, result, err) if err != nil { @@ -1918,7 +1920,7 @@ func newWebsocketClient(server *rpcServer, conn *websocket.Conn, authenticated: authenticated, isAdmin: isAdmin, sessionID: sessionID, - server: server, + rpcServer: server, serviceRequestSem: makeSemaphore(cfg.RPCMaxConcurrentReqs), ntfnChan: make(chan []byte, 1), // nonblocking sync sendChan: make(chan wsResponse, websocketSendBufferSize), @@ -1941,7 +1943,7 @@ func handleWebsocketHelp(wsc *wsClient, icmd interface{}) (interface{}, error) { method = types.Method(*cmd.Command) } if method == "" { - usage, err := wsc.server.helpCacher.rpcUsage(true) + usage, err := wsc.rpcServer.helpCacher.rpcUsage(true) if err != nil { context := "Failed to generate RPC usage" return nil, rpcInternalError(err.Error(), context) @@ -1964,7 +1966,7 @@ func handleWebsocketHelp(wsc *wsClient, icmd interface{}) (interface{}, error) { } // Get the help for the command. - help, err := wsc.server.helpCacher.rpcMethodHelp(method) + help, err := wsc.rpcServer.helpCacher.rpcMethodHelp(method) if err != nil { context := "Failed to generate help" return nil, rpcInternalError(err.Error(), context) @@ -1996,7 +1998,7 @@ func handleLoadTxFilter(wsc *wsClient, icmd interface{}) (interface{}, error) { wsc.Lock() if cmd.Reload || wsc.filterData == nil { wsc.filterData = makeWSClientFilter(cmd.Addresses, outPoints, - wsc.server.server.chainParams) + wsc.rpcServer.server.chainParams) wsc.Unlock() } else { filter := wsc.filterData @@ -2018,7 +2020,64 @@ func handleLoadTxFilter(wsc *wsClient, icmd interface{}) (interface{}, error) { // handleNotifyBlocks implements the notifyblocks command extension for // websocket connections. func handleNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) { - wsc.server.ntfnMgr.RegisterBlockUpdates(wsc) + wsc.rpcServer.ntfnMgr.RegisterBlockUpdates(wsc) + return nil, nil +} + +// handleRebroadcastMissed implements the rebroadcastmissed command. +func handleRebroadcastMissed(wsc *wsClient, icmd interface{}) (interface{}, error) { + best := wsc.rpcServer.chain.BestSnapshot() + mt, err := wsc.rpcServer.chain.MissedTickets() + if err != nil { + return nil, rpcInternalError("Could not get missed tickets "+ + err.Error(), "") + } + + stakeDiff, err := wsc.rpcServer.server.blockManager.CalcNextRequiredStakeDifficulty() + if err != nil { + return nil, rpcInternalError("Could not calculate next stake "+ + "difficulty "+err.Error(), "") + } + + missedTicketsNtfn := &blockchain.TicketNotificationsData{ + Hash: best.Hash, + Height: best.Height, + StakeDifficulty: stakeDiff, + TicketsSpent: []chainhash.Hash{}, + TicketsMissed: mt, + TicketsNew: []chainhash.Hash{}, + } + + wsc.rpcServer.ntfnMgr.NotifySpentAndMissedTickets(missedTicketsNtfn) + + return nil, nil +} + +// handleRebroadcastWinners implements the rebroadcastwinners command. +func handleRebroadcastWinners(wsc *wsClient, icmd interface{}) (interface{}, error) { + bestHeight := wsc.rpcServer.chain.BestSnapshot().Height + blocks, err := wsc.rpcServer.server.blockManager.TipGeneration() + if err != nil { + return nil, rpcInternalError("Could not get generation "+ + err.Error(), "") + } + + for i := range blocks { + winningTickets, _, _, err := + wsc.rpcServer.chain.LotteryDataForBlock(&blocks[i]) + if err != nil { + return nil, rpcInternalError("Lottery data for block "+ + "failed: "+err.Error(), "") + } + ntfnData := &WinningTicketsNtfnData{ + BlockHash: blocks[i], + BlockHeight: bestHeight, + Tickets: winningTickets, + } + + wsc.rpcServer.ntfnMgr.NotifyWinningTickets(ntfnData) + } + return nil, nil } @@ -2031,35 +2090,35 @@ func handleSession(wsc *wsClient, icmd interface{}) (interface{}, error) { // handleWinningTickets implements the notifywinningtickets command // extension for websocket connections. func handleWinningTickets(wsc *wsClient, icmd interface{}) (interface{}, error) { - wsc.server.ntfnMgr.RegisterWinningTickets(wsc) + wsc.rpcServer.ntfnMgr.RegisterWinningTickets(wsc) return nil, nil } // handleSpentAndMissedTickets implements the notifyspentandmissedtickets command // extension for websocket connections. func handleSpentAndMissedTickets(wsc *wsClient, icmd interface{}) (interface{}, error) { - wsc.server.ntfnMgr.RegisterSpentAndMissedTickets(wsc) + wsc.rpcServer.ntfnMgr.RegisterSpentAndMissedTickets(wsc) return nil, nil } // handleNewTickets implements the notifynewtickets command extension for // websocket connections. func handleNewTickets(wsc *wsClient, icmd interface{}) (interface{}, error) { - wsc.server.ntfnMgr.RegisterNewTickets(wsc) + wsc.rpcServer.ntfnMgr.RegisterNewTickets(wsc) return nil, nil } // handleStakeDifficulty implements the notifystakedifficulty command extension // for websocket connections. func handleStakeDifficulty(wsc *wsClient, icmd interface{}) (interface{}, error) { - wsc.server.ntfnMgr.RegisterStakeDifficulty(wsc) + wsc.rpcServer.ntfnMgr.RegisterStakeDifficulty(wsc) return nil, nil } // handleStopNotifyBlocks implements the stopnotifyblocks command extension for // websocket connections. func handleStopNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) { - wsc.server.ntfnMgr.UnregisterBlockUpdates(wsc) + wsc.rpcServer.ntfnMgr.UnregisterBlockUpdates(wsc) return nil, nil } @@ -2072,14 +2131,14 @@ func handleNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{}, } wsc.verboseTxUpdates = cmd.Verbose != nil && *cmd.Verbose - wsc.server.ntfnMgr.RegisterNewMempoolTxsUpdates(wsc) + wsc.rpcServer.ntfnMgr.RegisterNewMempoolTxsUpdates(wsc) return nil, nil } // handleStopNotifyNewTransations implements the stopnotifynewtransactions // command extension for websocket connections. func handleStopNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{}, error) { - wsc.server.ntfnMgr.UnregisterNewMempoolTxsUpdates(wsc) + wsc.rpcServer.ntfnMgr.UnregisterNewMempoolTxsUpdates(wsc) return nil, nil } @@ -2194,7 +2253,7 @@ func handleRescan(wsc *wsClient, icmd interface{}) (interface{}, error) { // Iterate over each block in the request and rescan. When a block // contains relevant transactions, add it to the response. - bc := wsc.server.chain + bc := wsc.rpcServer.chain var lastBlockHash *chainhash.Hash for i := range blockHashes { block, err := bc.BlockByHash(&blockHashes[i])