From 9b08b582d76869b7e16177fc0f45dd926c0eab31 Mon Sep 17 00:00:00 2001 From: Donald Adu-Poku Date: Fri, 6 Jul 2018 00:56:28 +0000 Subject: [PATCH] multi: add specialized rebroadcast handling for stake txs. --- blockmanager.go | 12 ++++++--- rpcserver.go | 11 +------- server.go | 71 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 13 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index 1f409610..767ccfdf 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -1101,13 +1101,13 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // Retrieve the current previous block hash. curPrevHash := b.chain.BestPrevHash() - nextStakeDiff, errSDiff := + nextStakeDiff, err := b.chain.CalcNextRequiredStakeDifficulty() - if errSDiff != nil { + if err != nil { bmgrLog.Warnf("Failed to get next stake difficulty "+ "calculation: %v", err) } - if r != nil && errSDiff == nil { + if r != nil && err == nil { // Update registered websocket clients on the // current stake difficulty. r.ntfnMgr.NotifyStakeDifficulty( @@ -2020,6 +2020,9 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { b.server.RemoveRebroadcastInventory(iv) } + // Filter and update the rebroadcast inventory. + b.server.PruneRebroadcastInventory() + // Notify registered websocket clients of incoming block. r.ntfnMgr.NotifyBlockConnected(block) } @@ -2103,6 +2106,9 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { } } + // Filter and update the rebroadcast inventory. + b.server.PruneRebroadcastInventory() + // Notify registered websocket clients. if r := b.server.rpcServer; r != nil { r.ntfnMgr.NotifyBlockDisconnected(block) diff --git a/rpcserver.go b/rpcserver.go index 2b1fdffc..84fbc43e 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -5081,16 +5081,7 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st // // Note that votes are only valid for a specific block and are time // sensitive, so they should not be added to the rebroadcast logic. - // - // TODO: Ideally ticket purchases and revocations could be added to the - // rebroadcast logic as well, however, they would need to be removed under - // certain circumstances such as when the stake difficulty interval changes - // and if a revocation is for a ticket that was missed, but then becomes - // live again due to a reorg. All stake transactions are ignored here since - // there is no clean infrastructure in place currently to handle those - // removals and perpetually broadcasting transactions which are no longer - // valid is not desirable. - if txType := stake.DetermineTxType(msgtx); txType == stake.TxTypeRegular { + if txType := stake.DetermineTxType(msgtx); txType != stake.TxTypeSSGen { iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash()) s.server.AddRebroadcastInventory(iv, tx) } diff --git a/server.go b/server.go index d98a3ac6..13719357 100644 --- a/server.go +++ b/server.go @@ -22,6 +22,7 @@ import ( "github.com/decred/dcrd/addrmgr" "github.com/decred/dcrd/blockchain" "github.com/decred/dcrd/blockchain/indexers" + "github.com/decred/dcrd/blockchain/stake" "github.com/decred/dcrd/chaincfg" "github.com/decred/dcrd/chaincfg/chainhash" "github.com/decred/dcrd/connmgr" @@ -83,6 +84,10 @@ type broadcastInventoryAdd relayMsg // needs to be removed from the rebroadcast map type broadcastInventoryDel *wire.InvVect +// broadcastPruneInventory is a type used to declare that rebroadcast +// inventory entries need to be filtered and removed where necessary +type broadcastPruneInventory struct{} + // relayMsg packages an inventory vector along with the newly discovered // inventory so the relay has access to that information. type relayMsg struct { @@ -1095,6 +1100,17 @@ func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) { s.modifyRebroadcastInv <- broadcastInventoryDel(iv) } +// PruneRebroadcastInventory filters and removes rebroadcast inventory entries +// where necessary. +func (s *server) PruneRebroadcastInventory() { + // Ignore if shutting down. + if atomic.LoadInt32(&s.shutdown) != 0 { + return + } + + s.modifyRebroadcastInv <- broadcastPruneInventory{} +} + // AnnounceNewTransactions generates and relays inventory vectors and notifies // both websocket and getblocktemplate long poll clients of the passed // transactions. This function should be called whenever new transactions @@ -1910,8 +1926,10 @@ func (s *server) rebroadcastHandler() { out: for { select { + case riv := <-s.modifyRebroadcastInv: switch msg := riv.(type) { + // Incoming InvVects are added to our map of RPC txs. case broadcastInventoryAdd: pendingInvs[*msg.invVect] = msg.data @@ -1922,6 +1940,59 @@ out: if _, ok := pendingInvs[*msg]; ok { delete(pendingInvs, *msg) } + + case broadcastPruneInventory: + best := s.blockManager.chain.BestSnapshot() + nextStakeDiff, err := + s.blockManager.chain.CalcNextRequiredStakeDifficulty() + if err != nil { + srvrLog.Errorf("Failed to get next stake difficulty: %v", + err) + break + } + + for iv, data := range pendingInvs { + tx, ok := data.(*dcrutil.Tx) + if !ok { + continue + } + + txType := stake.DetermineTxType(tx.MsgTx()) + + // Remove the ticket rebroadcast if the amount not equal to + // the current stake difficulty. + if txType == stake.TxTypeSStx && + tx.MsgTx().TxOut[0].Value != nextStakeDiff { + delete(pendingInvs, iv) + srvrLog.Debugf("Pending ticket purchase broadcast "+ + "inventory for tx %v removed. Ticket value not "+ + "equal to stake difficulty.", tx.Hash()) + continue + } + + // Remove the ticket rebroadcast if it has already expired. + if txType == stake.TxTypeSStx && + blockchain.IsExpired(tx, best.Height) { + delete(pendingInvs, iv) + srvrLog.Debugf("Pending ticket purchase broadcast "+ + "inventory for tx %v removed. Transaction "+ + "expired.", tx.Hash()) + continue + } + + // Remove the revocation rebroadcast if the associated + // ticket has been revived. + if txType == stake.TxTypeSSRtx { + refSStxHash := tx.MsgTx().TxIn[0].PreviousOutPoint.Hash + if !s.blockManager.chain.CheckLiveTicket(refSStxHash) { + delete(pendingInvs, iv) + srvrLog.Debugf("Pending revocation broadcast "+ + "inventory for tx %v removed. "+ + "Associated ticket was revived.", tx.Hash()) + continue + } + } + } } case <-timer.C: