mempool: Have ProcessTransaction return accepted transactions. (#547)

It is not the responsibility of mempool to relay transactions, so
return a slice of transactions accepted to the mempool due to the
passed transaction to the caller.
This commit is contained in:
David Hill 2016-04-14 13:58:09 -04:00 committed by cjepson
parent 65999dc171
commit 85835a3e1f
4 changed files with 86 additions and 49 deletions

View File

@ -281,7 +281,8 @@ type processBlockMsg struct {
// processTransactionResponse is a response sent to the reply channel of a
// processTransactionMsg.
type processTransactionResponse struct {
err error
acceptedTxs []*dcrutil.Tx
err error
}
// processTransactionMsg is a message type to be sent across the message
@ -910,7 +911,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
// Process the transaction to include validation, insertion in the
// memory pool, orphan handling, etc.
allowOrphans := cfg.MaxOrphanTxs > 0
err := tmsg.peer.server.txMemPool.ProcessTransaction(tmsg.tx,
acceptedTxs, err := b.server.txMemPool.ProcessTransaction(tmsg.tx,
allowOrphans, true, true)
// Remove transaction from request maps. Either the mempool/chain
@ -941,6 +942,8 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) {
false)
return
}
b.server.AnnounceNewTransactions(acceptedTxs)
}
// current returns true if we believe we are synced with our peers, false if we
@ -2072,10 +2075,11 @@ out:
}
case processTransactionMsg:
err := b.server.txMemPool.ProcessTransaction(msg.tx,
acceptedTxs, err := b.server.txMemPool.ProcessTransaction(msg.tx,
msg.allowOrphans, msg.rateLimit, msg.allowHighFees)
msg.reply <- processTransactionResponse{
err: err,
acceptedTxs: acceptedTxs,
err: err,
}
case isCurrentMsg:
@ -2278,7 +2282,8 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
b.server.txMemPool.RemoveTransaction(tx, false)
b.server.txMemPool.RemoveDoubleSpends(tx)
b.server.txMemPool.RemoveOrphan(tx.Sha())
b.server.txMemPool.ProcessOrphans(tx.Sha())
acceptedTxs := b.server.txMemPool.ProcessOrphans(tx.Sha())
b.server.AnnounceNewTransactions(acceptedTxs)
}
for _, stx := range block.STransactions()[0:] {
@ -2753,11 +2758,12 @@ func (b *blockManager) ProcessBlock(block *dcrutil.Block,
// a block chain. It is funneled through the block manager since blockchain is
// not safe for concurrent access.
func (b *blockManager) ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool,
rateLimit bool, allowHighFees bool) error {
rateLimit bool, allowHighFees bool) ([]*dcrutil.Tx, error) {
reply := make(chan processTransactionResponse, 1)
b.msgChan <- processTransactionMsg{tx, allowOrphans, rateLimit, allowHighFees, reply}
b.msgChan <- processTransactionMsg{tx, allowOrphans, rateLimit,
allowHighFees, reply}
response := <-reply
return response.err
return response.acceptedTxs, response.err
}
// FetchTransactionStore makes use of FetchTransactionStore on an internal

View File

@ -6,7 +6,6 @@
package main
import (
"bytes"
"container/list"
"crypto/rand"
"fmt"
@ -74,7 +73,6 @@ const (
// in a multi-signature transaction output script for it to be
// considered standard.
maxStandardMultiSigKeys = 3
// minTxRelayFeeMainNet is the minimum fee in atoms that is required for a
// transaction to be treated as free for relay and mining purposes. It
// is also used to help determine if a transaction is considered dust
@ -1602,7 +1600,9 @@ func (mp *txMemPool) MaybeAcceptTransaction(tx *dcrutil.Tx, isNew,
// ProcessOrphans. See the comment for ProcessOrphans for more details.
//
// This function MUST be called with the mempool lock held (for writes).
func (mp *txMemPool) processOrphans(hash *chainhash.Hash) {
func (mp *txMemPool) processOrphans(hash *chainhash.Hash) []*dcrutil.Tx {
var acceptedTxns []*dcrutil.Tx
// Start with processing at least the passed hash.
processHashes := list.New()
processHashes.PushBack(hash)
@ -1661,10 +1661,9 @@ func (mp *txMemPool) processOrphans(hash *chainhash.Hash) {
continue
}
// Generate and relay the inventory vector for the
// newly accepted transaction.
iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha())
mp.server.RelayInventory(iv, tx)
// Add this transaction to the list of transactions
// that are no longer orphans.
acceptedTxns = append(acceptedTxns, tx)
// Add this transaction to the list of transactions to
// process so any orphans that depend on this one are
@ -1682,6 +1681,8 @@ func (mp *txMemPool) processOrphans(hash *chainhash.Hash) {
processHashes.PushBack(orphanHash)
}
}
return acceptedTxns
}
// PruneStakeTx is the function which is called everytime a new block is
@ -1744,11 +1745,16 @@ func (mp *txMemPool) pruneExpiredTx(height int64) {
// newly accepted transactions (to detect further orphans which may no longer be
// orphans) until there are no more.
//
// It returns a slice of transactions added to the mempool. A nil slice means
// no transactions were moved from the orphan pool to the mempool.
//
// This function is safe for concurrent access.
func (mp *txMemPool) ProcessOrphans(hash *chainhash.Hash) {
func (mp *txMemPool) ProcessOrphans(hash *chainhash.Hash) []*dcrutil.Tx {
mp.Lock()
mp.processOrphans(hash)
acceptedTxns := mp.processOrphans(hash)
mp.Unlock()
return acceptedTxns
}
// ProcessTransaction is the main workhorse for handling insertion of new
@ -1756,9 +1762,14 @@ func (mp *txMemPool) ProcessOrphans(hash *chainhash.Hash) {
// such as rejecting duplicate transactions, ensuring transactions follow all
// rules, orphan transaction handling, and insertion into the memory pool.
//
// It returns a slice of transactions added to the mempool. When the
// error is nil, the list will include the passed transaction itself along
// with any additional orphan transaactions that were added as a result of
// the passed one being accepted.
//
// This function is safe for concurrent access.
func (mp *txMemPool) ProcessTransaction(tx *dcrutil.Tx, allowOrphan,
rateLimit, allowHighFees bool) error {
rateLimit, allowHighFees bool) ([]*dcrutil.Tx, error) {
// Protect concurrent access.
mp.Lock()
defer mp.Unlock()
@ -1768,59 +1779,50 @@ func (mp *txMemPool) ProcessTransaction(tx *dcrutil.Tx, allowOrphan,
// Potentially accept the transaction to the memory pool.
missingParents, err := mp.maybeAcceptTransaction(tx, true, rateLimit, allowHighFees)
if err != nil {
return err
return nil, err
}
// If len(missingParents) == 0 then we know the tx is NOT an orphan
if len(missingParents) == 0 {
// Generate the inventory vector and relay it.
iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha())
mp.server.RelayInventory(iv, tx)
// Accept any orphan transactions that depend on this
// transaction (they are no longer orphans) and repeat for those
// accepted transactions until there are no more.
mp.processOrphans(tx.Sha())
return nil
newTxs := mp.processOrphans(tx.Sha())
acceptedTxs := make([]*dcrutil.Tx, len(newTxs)+1)
// Add the parent transaction first so remote nodes
// do not add orphans.
acceptedTxs[0] = tx
copy(acceptedTxs[1:], newTxs)
return acceptedTxs, nil
}
// The transaction is an orphan (has inputs missing). Reject
// it if the flag to allow orphans is not set.
if !allowOrphan {
// Only use the first missing parent transaction in
// the error message.
//
// NOTE: RejectDuplicate is really not an accurate
// reject code here, but it matches the reference
// implementation and there isn't a better choice due
// to the limited number of reject codes. Missing
// inputs is assumed to mean they are already spent
// which is not really always the case.
var buf bytes.Buffer
buf.WriteString("transaction spends unknown inputs; includes " +
"inputs: \n")
lenIn := len(tx.MsgTx().TxIn)
for i, txIn := range tx.MsgTx().TxIn {
str := fmt.Sprintf("[%v]: %v, %v, %v",
i,
txIn.PreviousOutPoint.Hash,
txIn.PreviousOutPoint.Index,
txIn.PreviousOutPoint.Tree)
buf.WriteString(str)
if i != lenIn-1 {
buf.WriteString("\n")
}
}
txmpLog.Debugf("%v", buf.String())
return txRuleError(wire.RejectDuplicate,
"transaction spends unknown inputs")
str := fmt.Sprintf("orphan transaction %v references "+
"outputs of unknown or fully-spent "+
"transaction %v", tx.Sha(), missingParents[0])
return nil, txRuleError(wire.RejectDuplicate, str)
}
// Potentially add the orphan transaction to the orphan pool.
err = mp.maybeAddOrphan(tx)
if err != nil {
return err
return nil, err
}
return nil
return nil, nil
}
// Count returns the number of transactions in the main pool. It does not

View File

@ -4542,7 +4542,8 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st
}
tx := dcrutil.NewTx(msgtx)
err = s.server.blockManager.ProcessTransaction(tx, false, false, allowHighFees)
acceptedTxs, err := s.server.blockManager.ProcessTransaction(tx, false,
false, allowHighFees)
if err != nil {
// When the error is a rule error, it means the transaction was
// simply rejected as opposed to something actually going wrong,
@ -4563,6 +4564,8 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st
}
}
s.server.AnnounceNewTransactions(acceptedTxs)
// Keep track of all the sendrawtransaction request txns so that they
// can be rebroadcast if they don't make their way into a block.
iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha())

View File

@ -169,10 +169,36 @@ func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) {
s.modifyRebroadcastInv <- broadcastInventoryDel(iv)
}
// both websocket and getblocktemplate long poll clients of the passed
func (p *peerState) Count() int {
return len(p.peers) + len(p.outboundPeers) + len(p.persistentPeers)
}
// AnnounceNewTransactions generates and relays inventory vectors and notifies
// both websocket and getblocktemplate long poll clients of the passed
// transactions. This function should be called whenever new transactions
// are added to the mempool.
func (s *server) AnnounceNewTransactions(newTxs []*dcrutil.Tx) {
// Generate and relay inventory vectors for all newly accepted
// transactions into the memory pool due to the original being
// accepted.
for _, tx := range newTxs {
// Generate the inventory vector and relay it.
iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha())
s.RelayInventory(iv, tx)
if s.rpcServer != nil {
// Notify websocket clients about mempool transactions.
s.rpcServer.ntfnMgr.NotifyMempoolTx(tx, true)
// Potentially notify any getblocktemplate long poll clients
// about stale block templates due to the new transaction.
s.rpcServer.gbtWorkState.NotifyMempoolTx(
s.txMemPool.LastUpdated())
}
}
}
func (p *peerState) OutboundCount() int {
return len(p.outboundPeers) + len(p.persistentPeers)
}
@ -823,8 +849,8 @@ func (s *server) BanPeer(p *peer) {
s.banPeers <- p
}
// RelayInventory relays the passed inventory to all connected peers that are
// not already known to have it.
// RelayInventory relays the passed inventory vector to all connected peers
// that are not already known to have it.
func (s *server) RelayInventory(invVect *wire.InvVect, data interface{}) {
s.relayInv <- relayMsg{invVect: invVect, data: data}
}