multi: add specialized rebroadcast handling for stake txs.

This commit is contained in:
Donald Adu-Poku 2018-07-06 00:56:28 +00:00 committed by Dave Collins
parent d3199d2aa9
commit 9b08b582d7
3 changed files with 81 additions and 13 deletions

View File

@ -1101,13 +1101,13 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) {
// Retrieve the current previous block hash.
curPrevHash := b.chain.BestPrevHash()
nextStakeDiff, errSDiff :=
nextStakeDiff, err :=
b.chain.CalcNextRequiredStakeDifficulty()
if errSDiff != nil {
if err != nil {
bmgrLog.Warnf("Failed to get next stake difficulty "+
"calculation: %v", err)
}
if r != nil && errSDiff == nil {
if r != nil && err == nil {
// Update registered websocket clients on the
// current stake difficulty.
r.ntfnMgr.NotifyStakeDifficulty(
@ -2020,6 +2020,9 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
b.server.RemoveRebroadcastInventory(iv)
}
// Filter and update the rebroadcast inventory.
b.server.PruneRebroadcastInventory()
// Notify registered websocket clients of incoming block.
r.ntfnMgr.NotifyBlockConnected(block)
}
@ -2103,6 +2106,9 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
}
}
// Filter and update the rebroadcast inventory.
b.server.PruneRebroadcastInventory()
// Notify registered websocket clients.
if r := b.server.rpcServer; r != nil {
r.ntfnMgr.NotifyBlockDisconnected(block)

View File

@ -5081,16 +5081,7 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st
//
// Note that votes are only valid for a specific block and are time
// sensitive, so they should not be added to the rebroadcast logic.
//
// TODO: Ideally ticket purchases and revocations could be added to the
// rebroadcast logic as well, however, they would need to be removed under
// certain circumstances such as when the stake difficulty interval changes
// and if a revocation is for a ticket that was missed, but then becomes
// live again due to a reorg. All stake transactions are ignored here since
// there is no clean infrastructure in place currently to handle those
// removals and perpetually broadcasting transactions which are no longer
// valid is not desirable.
if txType := stake.DetermineTxType(msgtx); txType == stake.TxTypeRegular {
if txType := stake.DetermineTxType(msgtx); txType != stake.TxTypeSSGen {
iv := wire.NewInvVect(wire.InvTypeTx, tx.Hash())
s.server.AddRebroadcastInventory(iv, tx)
}

View File

@ -22,6 +22,7 @@ import (
"github.com/decred/dcrd/addrmgr"
"github.com/decred/dcrd/blockchain"
"github.com/decred/dcrd/blockchain/indexers"
"github.com/decred/dcrd/blockchain/stake"
"github.com/decred/dcrd/chaincfg"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/connmgr"
@ -83,6 +84,10 @@ type broadcastInventoryAdd relayMsg
// needs to be removed from the rebroadcast map
type broadcastInventoryDel *wire.InvVect
// broadcastPruneInventory is a type used to declare that rebroadcast
// inventory entries need to be filtered and removed where necessary
type broadcastPruneInventory struct{}
// relayMsg packages an inventory vector along with the newly discovered
// inventory so the relay has access to that information.
type relayMsg struct {
@ -1095,6 +1100,17 @@ func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) {
s.modifyRebroadcastInv <- broadcastInventoryDel(iv)
}
// PruneRebroadcastInventory filters and removes rebroadcast inventory entries
// where necessary.
func (s *server) PruneRebroadcastInventory() {
// Ignore if shutting down.
if atomic.LoadInt32(&s.shutdown) != 0 {
return
}
s.modifyRebroadcastInv <- broadcastPruneInventory{}
}
// AnnounceNewTransactions generates and relays inventory vectors and notifies
// both websocket and getblocktemplate long poll clients of the passed
// transactions. This function should be called whenever new transactions
@ -1910,8 +1926,10 @@ func (s *server) rebroadcastHandler() {
out:
for {
select {
case riv := <-s.modifyRebroadcastInv:
switch msg := riv.(type) {
// Incoming InvVects are added to our map of RPC txs.
case broadcastInventoryAdd:
pendingInvs[*msg.invVect] = msg.data
@ -1922,6 +1940,59 @@ out:
if _, ok := pendingInvs[*msg]; ok {
delete(pendingInvs, *msg)
}
case broadcastPruneInventory:
best := s.blockManager.chain.BestSnapshot()
nextStakeDiff, err :=
s.blockManager.chain.CalcNextRequiredStakeDifficulty()
if err != nil {
srvrLog.Errorf("Failed to get next stake difficulty: %v",
err)
break
}
for iv, data := range pendingInvs {
tx, ok := data.(*dcrutil.Tx)
if !ok {
continue
}
txType := stake.DetermineTxType(tx.MsgTx())
// Remove the ticket rebroadcast if the amount not equal to
// the current stake difficulty.
if txType == stake.TxTypeSStx &&
tx.MsgTx().TxOut[0].Value != nextStakeDiff {
delete(pendingInvs, iv)
srvrLog.Debugf("Pending ticket purchase broadcast "+
"inventory for tx %v removed. Ticket value not "+
"equal to stake difficulty.", tx.Hash())
continue
}
// Remove the ticket rebroadcast if it has already expired.
if txType == stake.TxTypeSStx &&
blockchain.IsExpired(tx, best.Height) {
delete(pendingInvs, iv)
srvrLog.Debugf("Pending ticket purchase broadcast "+
"inventory for tx %v removed. Transaction "+
"expired.", tx.Hash())
continue
}
// Remove the revocation rebroadcast if the associated
// ticket has been revived.
if txType == stake.TxTypeSSRtx {
refSStxHash := tx.MsgTx().TxIn[0].PreviousOutPoint.Hash
if !s.blockManager.chain.CheckLiveTicket(refSStxHash) {
delete(pendingInvs, iv)
srvrLog.Debugf("Pending revocation broadcast "+
"inventory for tx %v removed. "+
"Associated ticket was revived.", tx.Hash())
continue
}
}
}
}
case <-timer.C: