multi: make rebroadcast winners & missed ws only.

This commit is contained in:
Donald Adu-Poku 2019-09-03 12:00:47 +00:00 committed by Dave Collins
parent baac7efb44
commit a80134fa29
3 changed files with 84 additions and 88 deletions

View File

@ -226,8 +226,6 @@ var rpcHandlersBeforeInit = map[types.Method]commandHandler{
"missedtickets": handleMissedTickets,
"node": handleNode,
"ping": handlePing,
"rebroadcastmissed": handleRebroadcastMissed,
"rebroadcastwinners": handleRebroadcastWinners,
"searchrawtransactions": handleSearchRawTransactions,
"sendrawtransaction": handleSendRawTransaction,
"setgenerate": handleSetGenerate,
@ -4496,67 +4494,6 @@ func handlePing(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (inter
return nil, nil
}
// handleRebroadcastMissed implements the rebroadcastmissed command.
//
// TODO make this a websocket only command.
func handleRebroadcastMissed(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
best := s.server.chain.BestSnapshot()
mt, err := s.server.chain.MissedTickets()
if err != nil {
return nil, rpcInternalError("Could not get missed tickets "+
err.Error(), "")
}
stakeDiff, err := s.server.blockManager.CalcNextRequiredStakeDifficulty()
if err != nil {
return nil, rpcInternalError("Could not calculate next stake "+
"difficulty "+err.Error(), "")
}
missedTicketsNtfn := &blockchain.TicketNotificationsData{
Hash: best.Hash,
Height: best.Height,
StakeDifficulty: stakeDiff,
TicketsSpent: []chainhash.Hash{},
TicketsMissed: mt,
TicketsNew: []chainhash.Hash{},
}
s.ntfnMgr.NotifySpentAndMissedTickets(missedTicketsNtfn)
return nil, nil
}
// handleRebroadcastWinners implements the rebroadcastwinners command.
//
// TODO make this a websocket only command.
func handleRebroadcastWinners(s *rpcServer, cmd interface{}, closeChan <-chan struct{}) (interface{}, error) {
bestHeight := s.server.chain.BestSnapshot().Height
blocks, err := s.server.blockManager.TipGeneration()
if err != nil {
return nil, rpcInternalError("Could not get generation "+
err.Error(), "")
}
for i := range blocks {
winningTickets, _, _, err :=
s.server.chain.LotteryDataForBlock(&blocks[i])
if err != nil {
return nil, rpcInternalError("Lottery data for block "+
"failed: "+err.Error(), "")
}
ntfnData := &WinningTicketsNtfnData{
BlockHash: blocks[i],
BlockHeight: bestHeight,
Tickets: winningTickets,
}
s.ntfnMgr.NotifyWinningTickets(ntfnData)
}
return nil, nil
}
// retrievedTx represents a transaction that was either loaded from the
// transaction memory pool or from the database. When a transaction is loaded
// from the database, it is loaded with the raw serialized bytes while the

View File

@ -1030,8 +1030,6 @@ var rpcResultTypes = map[types.Method][]interface{}{
"missedtickets": {(*types.MissedTicketsResult)(nil)},
"node": nil,
"ping": nil,
"rebroadcastmissed": nil,
"rebroadcastwinners": nil,
"searchrawtransactions": {(*string)(nil), (*[]types.SearchRawTransactionsResult)(nil)},
"sendrawtransaction": {(*string)(nil)},
"setgenerate": nil,
@ -1048,7 +1046,6 @@ var rpcResultTypes = map[types.Method][]interface{}{
// Websocket commands.
"loadtxfilter": nil,
"session": {(*types.SessionResult)(nil)},
"notifywinningtickets": nil,
"notifyspentandmissedtickets": nil,
"notifynewtickets": nil,
@ -1057,7 +1054,10 @@ var rpcResultTypes = map[types.Method][]interface{}{
"notifynewtransactions": nil,
"notifyreceived": nil,
"notifyspent": nil,
"rebroadcastmissed": nil,
"rebroadcastwinners": nil,
"rescan": nil,
"session": {(*types.SessionResult)(nil)},
"stopnotifyblocks": nil,
"stopnotifynewtransactions": nil,
"stopnotifyreceived": nil,

View File

@ -64,6 +64,7 @@ type wsCommandHandler func(*wsClient, interface{}) (interface{}, error)
// causes a dependency loop.
var wsHandlers map[types.Method]wsCommandHandler
var wsHandlersBeforeInit = map[types.Method]wsCommandHandler{
"help": handleWebsocketHelp,
"loadtxfilter": handleLoadTxFilter,
"notifyblocks": handleNotifyBlocks,
"notifywinningtickets": handleWinningTickets,
@ -71,9 +72,10 @@ var wsHandlersBeforeInit = map[types.Method]wsCommandHandler{
"notifynewtickets": handleNewTickets,
"notifystakedifficulty": handleStakeDifficulty,
"notifynewtransactions": handleNotifyNewTransactions,
"session": handleSession,
"help": handleWebsocketHelp,
"rebroadcastmissed": handleRebroadcastMissed,
"rebroadcastwinners": handleRebroadcastWinners,
"rescan": handleRescan,
"session": handleSession,
"stopnotifyblocks": handleStopNotifyBlocks,
"stopnotifynewtransactions": handleStopNotifyNewTransactions,
}
@ -1173,7 +1175,7 @@ type wsClient struct {
sync.Mutex
// server is the RPC server that is servicing the client.
server *rpcServer
rpcServer *rpcServer
// conn is the underlying websocket connection.
conn *websocket.Conn
@ -1326,8 +1328,8 @@ out:
login := authCmd.Username + ":" + authCmd.Passphrase
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
authSha := sha256.Sum256([]byte(auth))
cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:])
limitcmp := subtle.ConstantTimeCompare(authSha[:], c.server.limitauthsha[:])
cmp := subtle.ConstantTimeCompare(authSha[:], c.rpcServer.authsha[:])
limitcmp := subtle.ConstantTimeCompare(authSha[:], c.rpcServer.limitauthsha[:])
if cmp != 1 && limitcmp != 1 {
rpcsLog.Warnf("Auth failure.")
break out
@ -1563,8 +1565,8 @@ out:
login := authCmd.Username + ":" + authCmd.Passphrase
auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login))
authSha := sha256.Sum256([]byte(auth))
cmp := subtle.ConstantTimeCompare(authSha[:], c.server.authsha[:])
limitcmp := subtle.ConstantTimeCompare(authSha[:], c.server.limitauthsha[:])
cmp := subtle.ConstantTimeCompare(authSha[:], c.rpcServer.authsha[:])
limitcmp := subtle.ConstantTimeCompare(authSha[:], c.rpcServer.limitauthsha[:])
if cmp != 1 && limitcmp != 1 {
rpcsLog.Warnf("Auth failure.")
break out
@ -1617,7 +1619,7 @@ out:
if ok {
resp, err = wsHandler(c, cmd.params)
} else {
resp, err = c.server.standardCmdResult(cmd, nil)
resp, err = c.rpcServer.standardCmdResult(cmd, nil)
}
// Marshal request output.
@ -1688,7 +1690,7 @@ func (c *wsClient) serviceRequest(r *parsedRPCCmd) {
if ok {
result, err = wsHandler(c, r.params)
} else {
result, err = c.server.standardCmdResult(r, nil)
result, err = c.rpcServer.standardCmdResult(r, nil)
}
reply, err := createMarshalledReply(r.jsonrpc, r.id, result, err)
if err != nil {
@ -1918,7 +1920,7 @@ func newWebsocketClient(server *rpcServer, conn *websocket.Conn,
authenticated: authenticated,
isAdmin: isAdmin,
sessionID: sessionID,
server: server,
rpcServer: server,
serviceRequestSem: makeSemaphore(cfg.RPCMaxConcurrentReqs),
ntfnChan: make(chan []byte, 1), // nonblocking sync
sendChan: make(chan wsResponse, websocketSendBufferSize),
@ -1941,7 +1943,7 @@ func handleWebsocketHelp(wsc *wsClient, icmd interface{}) (interface{}, error) {
method = types.Method(*cmd.Command)
}
if method == "" {
usage, err := wsc.server.helpCacher.rpcUsage(true)
usage, err := wsc.rpcServer.helpCacher.rpcUsage(true)
if err != nil {
context := "Failed to generate RPC usage"
return nil, rpcInternalError(err.Error(), context)
@ -1964,7 +1966,7 @@ func handleWebsocketHelp(wsc *wsClient, icmd interface{}) (interface{}, error) {
}
// Get the help for the command.
help, err := wsc.server.helpCacher.rpcMethodHelp(method)
help, err := wsc.rpcServer.helpCacher.rpcMethodHelp(method)
if err != nil {
context := "Failed to generate help"
return nil, rpcInternalError(err.Error(), context)
@ -1996,7 +1998,7 @@ func handleLoadTxFilter(wsc *wsClient, icmd interface{}) (interface{}, error) {
wsc.Lock()
if cmd.Reload || wsc.filterData == nil {
wsc.filterData = makeWSClientFilter(cmd.Addresses, outPoints,
wsc.server.server.chainParams)
wsc.rpcServer.server.chainParams)
wsc.Unlock()
} else {
filter := wsc.filterData
@ -2018,7 +2020,64 @@ func handleLoadTxFilter(wsc *wsClient, icmd interface{}) (interface{}, error) {
// handleNotifyBlocks implements the notifyblocks command extension for
// websocket connections.
func handleNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) {
wsc.server.ntfnMgr.RegisterBlockUpdates(wsc)
wsc.rpcServer.ntfnMgr.RegisterBlockUpdates(wsc)
return nil, nil
}
// handleRebroadcastMissed implements the rebroadcastmissed command.
func handleRebroadcastMissed(wsc *wsClient, icmd interface{}) (interface{}, error) {
best := wsc.rpcServer.chain.BestSnapshot()
mt, err := wsc.rpcServer.chain.MissedTickets()
if err != nil {
return nil, rpcInternalError("Could not get missed tickets "+
err.Error(), "")
}
stakeDiff, err := wsc.rpcServer.server.blockManager.CalcNextRequiredStakeDifficulty()
if err != nil {
return nil, rpcInternalError("Could not calculate next stake "+
"difficulty "+err.Error(), "")
}
missedTicketsNtfn := &blockchain.TicketNotificationsData{
Hash: best.Hash,
Height: best.Height,
StakeDifficulty: stakeDiff,
TicketsSpent: []chainhash.Hash{},
TicketsMissed: mt,
TicketsNew: []chainhash.Hash{},
}
wsc.rpcServer.ntfnMgr.NotifySpentAndMissedTickets(missedTicketsNtfn)
return nil, nil
}
// handleRebroadcastWinners implements the rebroadcastwinners command.
func handleRebroadcastWinners(wsc *wsClient, icmd interface{}) (interface{}, error) {
bestHeight := wsc.rpcServer.chain.BestSnapshot().Height
blocks, err := wsc.rpcServer.server.blockManager.TipGeneration()
if err != nil {
return nil, rpcInternalError("Could not get generation "+
err.Error(), "")
}
for i := range blocks {
winningTickets, _, _, err :=
wsc.rpcServer.chain.LotteryDataForBlock(&blocks[i])
if err != nil {
return nil, rpcInternalError("Lottery data for block "+
"failed: "+err.Error(), "")
}
ntfnData := &WinningTicketsNtfnData{
BlockHash: blocks[i],
BlockHeight: bestHeight,
Tickets: winningTickets,
}
wsc.rpcServer.ntfnMgr.NotifyWinningTickets(ntfnData)
}
return nil, nil
}
@ -2031,35 +2090,35 @@ func handleSession(wsc *wsClient, icmd interface{}) (interface{}, error) {
// handleWinningTickets implements the notifywinningtickets command
// extension for websocket connections.
func handleWinningTickets(wsc *wsClient, icmd interface{}) (interface{}, error) {
wsc.server.ntfnMgr.RegisterWinningTickets(wsc)
wsc.rpcServer.ntfnMgr.RegisterWinningTickets(wsc)
return nil, nil
}
// handleSpentAndMissedTickets implements the notifyspentandmissedtickets command
// extension for websocket connections.
func handleSpentAndMissedTickets(wsc *wsClient, icmd interface{}) (interface{}, error) {
wsc.server.ntfnMgr.RegisterSpentAndMissedTickets(wsc)
wsc.rpcServer.ntfnMgr.RegisterSpentAndMissedTickets(wsc)
return nil, nil
}
// handleNewTickets implements the notifynewtickets command extension for
// websocket connections.
func handleNewTickets(wsc *wsClient, icmd interface{}) (interface{}, error) {
wsc.server.ntfnMgr.RegisterNewTickets(wsc)
wsc.rpcServer.ntfnMgr.RegisterNewTickets(wsc)
return nil, nil
}
// handleStakeDifficulty implements the notifystakedifficulty command extension
// for websocket connections.
func handleStakeDifficulty(wsc *wsClient, icmd interface{}) (interface{}, error) {
wsc.server.ntfnMgr.RegisterStakeDifficulty(wsc)
wsc.rpcServer.ntfnMgr.RegisterStakeDifficulty(wsc)
return nil, nil
}
// handleStopNotifyBlocks implements the stopnotifyblocks command extension for
// websocket connections.
func handleStopNotifyBlocks(wsc *wsClient, icmd interface{}) (interface{}, error) {
wsc.server.ntfnMgr.UnregisterBlockUpdates(wsc)
wsc.rpcServer.ntfnMgr.UnregisterBlockUpdates(wsc)
return nil, nil
}
@ -2072,14 +2131,14 @@ func handleNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{},
}
wsc.verboseTxUpdates = cmd.Verbose != nil && *cmd.Verbose
wsc.server.ntfnMgr.RegisterNewMempoolTxsUpdates(wsc)
wsc.rpcServer.ntfnMgr.RegisterNewMempoolTxsUpdates(wsc)
return nil, nil
}
// handleStopNotifyNewTransations implements the stopnotifynewtransactions
// command extension for websocket connections.
func handleStopNotifyNewTransactions(wsc *wsClient, icmd interface{}) (interface{}, error) {
wsc.server.ntfnMgr.UnregisterNewMempoolTxsUpdates(wsc)
wsc.rpcServer.ntfnMgr.UnregisterNewMempoolTxsUpdates(wsc)
return nil, nil
}
@ -2194,7 +2253,7 @@ func handleRescan(wsc *wsClient, icmd interface{}) (interface{}, error) {
// Iterate over each block in the request and rescan. When a block
// contains relevant transactions, add it to the response.
bc := wsc.server.chain
bc := wsc.rpcServer.chain
var lastBlockHash *chainhash.Hash
for i := range blockHashes {
block, err := bc.BlockByHash(&blockHashes[i])