diff --git a/blockmanager.go b/blockmanager.go index 7ef7054a..ea554f58 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -281,7 +281,8 @@ type processBlockMsg struct { // processTransactionResponse is a response sent to the reply channel of a // processTransactionMsg. type processTransactionResponse struct { - err error + acceptedTxs []*dcrutil.Tx + err error } // processTransactionMsg is a message type to be sent across the message @@ -910,7 +911,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // Process the transaction to include validation, insertion in the // memory pool, orphan handling, etc. allowOrphans := cfg.MaxOrphanTxs > 0 - err := tmsg.peer.server.txMemPool.ProcessTransaction(tmsg.tx, + acceptedTxs, err := b.server.txMemPool.ProcessTransaction(tmsg.tx, allowOrphans, true, true) // Remove transaction from request maps. Either the mempool/chain @@ -941,6 +942,8 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { false) return } + + b.server.AnnounceNewTransactions(acceptedTxs) } // current returns true if we believe we are synced with our peers, false if we @@ -2072,10 +2075,11 @@ out: } case processTransactionMsg: - err := b.server.txMemPool.ProcessTransaction(msg.tx, + acceptedTxs, err := b.server.txMemPool.ProcessTransaction(msg.tx, msg.allowOrphans, msg.rateLimit, msg.allowHighFees) msg.reply <- processTransactionResponse{ - err: err, + acceptedTxs: acceptedTxs, + err: err, } case isCurrentMsg: @@ -2278,7 +2282,8 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { b.server.txMemPool.RemoveTransaction(tx, false) b.server.txMemPool.RemoveDoubleSpends(tx) b.server.txMemPool.RemoveOrphan(tx.Sha()) - b.server.txMemPool.ProcessOrphans(tx.Sha()) + acceptedTxs := b.server.txMemPool.ProcessOrphans(tx.Sha()) + b.server.AnnounceNewTransactions(acceptedTxs) } for _, stx := range block.STransactions()[0:] { @@ -2753,11 +2758,12 @@ func (b *blockManager) ProcessBlock(block *dcrutil.Block, // a block chain. It is funneled through the block manager since blockchain is // not safe for concurrent access. func (b *blockManager) ProcessTransaction(tx *dcrutil.Tx, allowOrphans bool, - rateLimit bool, allowHighFees bool) error { + rateLimit bool, allowHighFees bool) ([]*dcrutil.Tx, error) { reply := make(chan processTransactionResponse, 1) - b.msgChan <- processTransactionMsg{tx, allowOrphans, rateLimit, allowHighFees, reply} + b.msgChan <- processTransactionMsg{tx, allowOrphans, rateLimit, + allowHighFees, reply} response := <-reply - return response.err + return response.acceptedTxs, response.err } // FetchTransactionStore makes use of FetchTransactionStore on an internal diff --git a/mempool.go b/mempool.go index fe057b22..99199474 100644 --- a/mempool.go +++ b/mempool.go @@ -6,7 +6,6 @@ package main import ( - "bytes" "container/list" "crypto/rand" "fmt" @@ -74,7 +73,6 @@ const ( // in a multi-signature transaction output script for it to be // considered standard. maxStandardMultiSigKeys = 3 - // minTxRelayFeeMainNet is the minimum fee in atoms that is required for a // transaction to be treated as free for relay and mining purposes. It // is also used to help determine if a transaction is considered dust @@ -1602,7 +1600,9 @@ func (mp *txMemPool) MaybeAcceptTransaction(tx *dcrutil.Tx, isNew, // ProcessOrphans. See the comment for ProcessOrphans for more details. // // This function MUST be called with the mempool lock held (for writes). -func (mp *txMemPool) processOrphans(hash *chainhash.Hash) { +func (mp *txMemPool) processOrphans(hash *chainhash.Hash) []*dcrutil.Tx { + var acceptedTxns []*dcrutil.Tx + // Start with processing at least the passed hash. processHashes := list.New() processHashes.PushBack(hash) @@ -1661,10 +1661,9 @@ func (mp *txMemPool) processOrphans(hash *chainhash.Hash) { continue } - // Generate and relay the inventory vector for the - // newly accepted transaction. - iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) - mp.server.RelayInventory(iv, tx) + // Add this transaction to the list of transactions + // that are no longer orphans. + acceptedTxns = append(acceptedTxns, tx) // Add this transaction to the list of transactions to // process so any orphans that depend on this one are @@ -1682,6 +1681,8 @@ func (mp *txMemPool) processOrphans(hash *chainhash.Hash) { processHashes.PushBack(orphanHash) } } + + return acceptedTxns } // PruneStakeTx is the function which is called everytime a new block is @@ -1744,11 +1745,16 @@ func (mp *txMemPool) pruneExpiredTx(height int64) { // newly accepted transactions (to detect further orphans which may no longer be // orphans) until there are no more. // +// It returns a slice of transactions added to the mempool. A nil slice means +// no transactions were moved from the orphan pool to the mempool. +// // This function is safe for concurrent access. -func (mp *txMemPool) ProcessOrphans(hash *chainhash.Hash) { +func (mp *txMemPool) ProcessOrphans(hash *chainhash.Hash) []*dcrutil.Tx { mp.Lock() - mp.processOrphans(hash) + acceptedTxns := mp.processOrphans(hash) mp.Unlock() + + return acceptedTxns } // ProcessTransaction is the main workhorse for handling insertion of new @@ -1756,9 +1762,14 @@ func (mp *txMemPool) ProcessOrphans(hash *chainhash.Hash) { // such as rejecting duplicate transactions, ensuring transactions follow all // rules, orphan transaction handling, and insertion into the memory pool. // +// It returns a slice of transactions added to the mempool. When the +// error is nil, the list will include the passed transaction itself along +// with any additional orphan transaactions that were added as a result of +// the passed one being accepted. +// // This function is safe for concurrent access. func (mp *txMemPool) ProcessTransaction(tx *dcrutil.Tx, allowOrphan, - rateLimit, allowHighFees bool) error { + rateLimit, allowHighFees bool) ([]*dcrutil.Tx, error) { // Protect concurrent access. mp.Lock() defer mp.Unlock() @@ -1768,59 +1779,50 @@ func (mp *txMemPool) ProcessTransaction(tx *dcrutil.Tx, allowOrphan, // Potentially accept the transaction to the memory pool. missingParents, err := mp.maybeAcceptTransaction(tx, true, rateLimit, allowHighFees) if err != nil { - return err + return nil, err } // If len(missingParents) == 0 then we know the tx is NOT an orphan if len(missingParents) == 0 { - // Generate the inventory vector and relay it. - iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) - mp.server.RelayInventory(iv, tx) - // Accept any orphan transactions that depend on this // transaction (they are no longer orphans) and repeat for those // accepted transactions until there are no more. - mp.processOrphans(tx.Sha()) - return nil + newTxs := mp.processOrphans(tx.Sha()) + acceptedTxs := make([]*dcrutil.Tx, len(newTxs)+1) + + // Add the parent transaction first so remote nodes + // do not add orphans. + acceptedTxs[0] = tx + copy(acceptedTxs[1:], newTxs) + + return acceptedTxs, nil } // The transaction is an orphan (has inputs missing). Reject // it if the flag to allow orphans is not set. if !allowOrphan { + // Only use the first missing parent transaction in + // the error message. + // // NOTE: RejectDuplicate is really not an accurate // reject code here, but it matches the reference // implementation and there isn't a better choice due // to the limited number of reject codes. Missing // inputs is assumed to mean they are already spent // which is not really always the case. - var buf bytes.Buffer - buf.WriteString("transaction spends unknown inputs; includes " + - "inputs: \n") - lenIn := len(tx.MsgTx().TxIn) - for i, txIn := range tx.MsgTx().TxIn { - str := fmt.Sprintf("[%v]: %v, %v, %v", - i, - txIn.PreviousOutPoint.Hash, - txIn.PreviousOutPoint.Index, - txIn.PreviousOutPoint.Tree) - buf.WriteString(str) - if i != lenIn-1 { - buf.WriteString("\n") - } - } - txmpLog.Debugf("%v", buf.String()) - - return txRuleError(wire.RejectDuplicate, - "transaction spends unknown inputs") + str := fmt.Sprintf("orphan transaction %v references "+ + "outputs of unknown or fully-spent "+ + "transaction %v", tx.Sha(), missingParents[0]) + return nil, txRuleError(wire.RejectDuplicate, str) } // Potentially add the orphan transaction to the orphan pool. err = mp.maybeAddOrphan(tx) if err != nil { - return err + return nil, err } - return nil + return nil, nil } // Count returns the number of transactions in the main pool. It does not diff --git a/rpcserver.go b/rpcserver.go index eb3bb566..8fe8f133 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -4542,7 +4542,8 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st } tx := dcrutil.NewTx(msgtx) - err = s.server.blockManager.ProcessTransaction(tx, false, false, allowHighFees) + acceptedTxs, err := s.server.blockManager.ProcessTransaction(tx, false, + false, allowHighFees) if err != nil { // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, @@ -4563,6 +4564,8 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st } } + s.server.AnnounceNewTransactions(acceptedTxs) + // Keep track of all the sendrawtransaction request txns so that they // can be rebroadcast if they don't make their way into a block. iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) diff --git a/server.go b/server.go index 0b9e1243..aa10541e 100644 --- a/server.go +++ b/server.go @@ -169,10 +169,36 @@ func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) { s.modifyRebroadcastInv <- broadcastInventoryDel(iv) } +// both websocket and getblocktemplate long poll clients of the passed func (p *peerState) Count() int { return len(p.peers) + len(p.outboundPeers) + len(p.persistentPeers) } +// AnnounceNewTransactions generates and relays inventory vectors and notifies +// both websocket and getblocktemplate long poll clients of the passed +// transactions. This function should be called whenever new transactions +// are added to the mempool. +func (s *server) AnnounceNewTransactions(newTxs []*dcrutil.Tx) { + // Generate and relay inventory vectors for all newly accepted + // transactions into the memory pool due to the original being + // accepted. + for _, tx := range newTxs { + // Generate the inventory vector and relay it. + iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) + s.RelayInventory(iv, tx) + + if s.rpcServer != nil { + // Notify websocket clients about mempool transactions. + s.rpcServer.ntfnMgr.NotifyMempoolTx(tx, true) + + // Potentially notify any getblocktemplate long poll clients + // about stale block templates due to the new transaction. + s.rpcServer.gbtWorkState.NotifyMempoolTx( + s.txMemPool.LastUpdated()) + } + } +} + func (p *peerState) OutboundCount() int { return len(p.outboundPeers) + len(p.persistentPeers) } @@ -823,8 +849,8 @@ func (s *server) BanPeer(p *peer) { s.banPeers <- p } -// RelayInventory relays the passed inventory to all connected peers that are -// not already known to have it. +// RelayInventory relays the passed inventory vector to all connected peers +// that are not already known to have it. func (s *server) RelayInventory(invVect *wire.InvVect, data interface{}) { s.relayInv <- relayMsg{invVect: invVect, data: data} }