From a1bb291b282433035f63b17773c38fc9e0f103ff Mon Sep 17 00:00:00 2001 From: David Hill Date: Thu, 14 Apr 2016 13:58:09 -0400 Subject: [PATCH 1/3] mempool: Have ProcessTransaction return accepted transactions. (#547) It is not the responsibility of mempool to relay transactions, so return a slice of transactions accepted to the mempool due to the passed transaction to the caller. --- blockmanager.go | 7 +++- mempool.go | 98 +++++++++++++++++++++++++++---------------------- rpcserver.go | 4 +- server.go | 74 ++++++++++++++----------------------- 4 files changed, 90 insertions(+), 93 deletions(-) diff --git a/blockmanager.go b/blockmanager.go index be8c395a..b8c4a55b 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -454,7 +454,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // Process the transaction to include validation, insertion in the // memory pool, orphan handling, etc. allowOrphans := cfg.MaxOrphanTxs > 0 - err := b.server.txMemPool.ProcessTransaction(tmsg.tx, + acceptedTxs, err := b.server.txMemPool.ProcessTransaction(tmsg.tx, allowOrphans, true) // Remove transaction from request maps. Either the mempool/chain @@ -489,6 +489,8 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { false) return } + + b.server.AnnounceNewTransactions(acceptedTxs) } // current returns true if we believe we are synced with our peers, false if we @@ -1209,7 +1211,8 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { b.server.txMemPool.RemoveTransaction(tx, false) b.server.txMemPool.RemoveDoubleSpends(tx) b.server.txMemPool.RemoveOrphan(tx.Sha()) - b.server.txMemPool.ProcessOrphans(tx.Sha()) + acceptedTxs := b.server.txMemPool.ProcessOrphans(tx.Sha()) + b.server.AnnounceNewTransactions(acceptedTxs) } if r := b.server.rpcServer; r != nil { diff --git a/mempool.go b/mempool.go index 6947424d..9feeb675 100644 --- a/mempool.go +++ b/mempool.go @@ -52,10 +52,6 @@ type mempoolConfig struct { // the current best chain. Chain *blockchain.BlockChain - // RelayNtfnChan defines the channel to send newly accepted transactions - // to. If unset or set to nil, notifications will not be sent. - RelayNtfnChan chan *btcutil.Tx - // SigCache defines a signature cache to use. SigCache *txscript.SigCache @@ -751,7 +747,9 @@ func (mp *txMemPool) MaybeAcceptTransaction(tx *btcutil.Tx, isNew, rateLimit boo // ProcessOrphans. See the comment for ProcessOrphans for more details. // // This function MUST be called with the mempool lock held (for writes). -func (mp *txMemPool) processOrphans(hash *wire.ShaHash) { +func (mp *txMemPool) processOrphans(hash *wire.ShaHash) []*btcutil.Tx { + var acceptedTxns []*btcutil.Tx + // Start with processing at least the passed hash. processHashes := list.New() processHashes.PushBack(hash) @@ -807,10 +805,9 @@ func (mp *txMemPool) processOrphans(hash *wire.ShaHash) { continue } - // Notify the caller of the new tx added to mempool. - if mp.cfg.RelayNtfnChan != nil { - mp.cfg.RelayNtfnChan <- tx - } + // Add this transaction to the list of transactions + // that are no longer orphans. + acceptedTxns = append(acceptedTxns, tx) // Add this transaction to the list of transactions to // process so any orphans that depend on this one are @@ -828,6 +825,8 @@ func (mp *txMemPool) processOrphans(hash *wire.ShaHash) { processHashes.PushBack(orphanHash) } } + + return acceptedTxns } // ProcessOrphans determines if there are any orphans which depend on the passed @@ -836,11 +835,16 @@ func (mp *txMemPool) processOrphans(hash *wire.ShaHash) { // newly accepted transactions (to detect further orphans which may no longer be // orphans) until there are no more. // +// It returns a slice of transactions added to the mempool. A nil slice means +// no transactions were moved from the orphan pool to the mempool. +// // This function is safe for concurrent access. -func (mp *txMemPool) ProcessOrphans(hash *wire.ShaHash) { +func (mp *txMemPool) ProcessOrphans(hash *wire.ShaHash) []*btcutil.Tx { mp.Lock() - mp.processOrphans(hash) + acceptedTxns := mp.processOrphans(hash) mp.Unlock() + + return acceptedTxns } // ProcessTransaction is the main workhorse for handling insertion of new @@ -848,8 +852,13 @@ func (mp *txMemPool) ProcessOrphans(hash *wire.ShaHash) { // such as rejecting duplicate transactions, ensuring transactions follow all // rules, orphan transaction handling, and insertion into the memory pool. // +// It returns a slice of transactions added to the mempool. When the +// error is nil, the list will include the passed transaction itself along +// with any additional orphan transaactions that were added as a result of +// the passed one being accepted. +// // This function is safe for concurrent access. -func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool) error { +func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit bool) ([]*btcutil.Tx, error) { // Protect concurrent access. mp.Lock() defer mp.Unlock() @@ -859,47 +868,50 @@ func (mp *txMemPool) ProcessTransaction(tx *btcutil.Tx, allowOrphan, rateLimit b // Potentially accept the transaction to the memory pool. missingParents, err := mp.maybeAcceptTransaction(tx, true, rateLimit) if err != nil { - return err + return nil, err } if len(missingParents) == 0 { - // Notify the caller that the tx was added to the mempool. - if mp.cfg.RelayNtfnChan != nil { - mp.cfg.RelayNtfnChan <- tx - } - // Accept any orphan transactions that depend on this // transaction (they may no longer be orphans if all inputs // are now available) and repeat for those accepted // transactions until there are no more. - mp.processOrphans(tx.Sha()) - } else { - // The transaction is an orphan (has inputs missing). Reject - // it if the flag to allow orphans is not set. - if !allowOrphan { - // Only use the first missing parent transaction in - // the error message. - // - // NOTE: RejectDuplicate is really not an accurate - // reject code here, but it matches the reference - // implementation and there isn't a better choice due - // to the limited number of reject codes. Missing - // inputs is assumed to mean they are already spent - // which is not really always the case. - str := fmt.Sprintf("orphan transaction %v references "+ - "outputs of unknown or fully-spent "+ - "transaction %v", tx.Sha(), missingParents[0]) - return txRuleError(wire.RejectDuplicate, str) - } + newTxs := mp.processOrphans(tx.Sha()) + acceptedTxs := make([]*btcutil.Tx, len(newTxs)+1) - // Potentially add the orphan transaction to the orphan pool. - err := mp.maybeAddOrphan(tx) - if err != nil { - return err - } + // Add the parent transaction first so remote nodes + // do not add orphans. + acceptedTxs[0] = tx + copy(acceptedTxs[1:], newTxs) + + return acceptedTxs, nil } - return nil + // The transaction is an orphan (has inputs missing). Reject + // it if the flag to allow orphans is not set. + if !allowOrphan { + // Only use the first missing parent transaction in + // the error message. + // + // NOTE: RejectDuplicate is really not an accurate + // reject code here, but it matches the reference + // implementation and there isn't a better choice due + // to the limited number of reject codes. Missing + // inputs is assumed to mean they are already spent + // which is not really always the case. + str := fmt.Sprintf("orphan transaction %v references "+ + "outputs of unknown or fully-spent "+ + "transaction %v", tx.Sha(), missingParents[0]) + return nil, txRuleError(wire.RejectDuplicate, str) + } + + // Potentially add the orphan transaction to the orphan pool. + err = mp.maybeAddOrphan(tx) + if err != nil { + return nil, err + } + + return nil, nil } // Count returns the number of transactions in the main pool. It does not diff --git a/rpcserver.go b/rpcserver.go index 0533b153..2d1564c1 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3438,7 +3438,7 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st } tx := btcutil.NewTx(msgtx) - err = s.server.txMemPool.ProcessTransaction(tx, false, false) + acceptedTxs, err := s.server.txMemPool.ProcessTransaction(tx, false, false) if err != nil { // When the error is a rule error, it means the transaction was // simply rejected as opposed to something actually going wrong, @@ -3459,6 +3459,8 @@ func handleSendRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan st } } + s.server.AnnounceNewTransactions(acceptedTxs) + // Keep track of all the sendrawtransaction request txns so that they // can be rebroadcast if they don't make their way into a block. iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) diff --git a/server.go b/server.go index b7fe2400..f2c64ec9 100644 --- a/server.go +++ b/server.go @@ -185,7 +185,6 @@ type server struct { blockManager *blockManager txMemPool *txMemPool cpuMiner *CPUMiner - relayNtfnChan chan *btcutil.Tx modifyRebroadcastInv chan interface{} pendingPeers chan *serverPeer newPeers chan *serverPeer @@ -961,6 +960,31 @@ func (s *server) RemoveRebroadcastInventory(iv *wire.InvVect) { s.modifyRebroadcastInv <- broadcastInventoryDel(iv) } +// AnnounceNewTransactions generates and relays inventory vectors and notifies +// both websocket and getblocktemplate long poll clients of the passed +// transactions. This function should be called whenever new transactions +// are added to the mempool. +func (s *server) AnnounceNewTransactions(newTxs []*btcutil.Tx) { + // Generate and relay inventory vectors for all newly accepted + // transactions into the memory pool due to the original being + // accepted. + for _, tx := range newTxs { + // Generate the inventory vector and relay it. + iv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) + s.RelayInventory(iv, tx) + + if s.rpcServer != nil { + // Notify websocket clients about mempool transactions. + s.rpcServer.ntfnMgr.NotifyMempoolTx(tx, true) + + // Potentially notify any getblocktemplate long poll clients + // about stale block templates due to the new transaction. + s.rpcServer.gbtWorkState.NotifyMempoolTx( + s.txMemPool.LastUpdated()) + } + } +} + // pushTxMsg sends a tx message for the provided transaction hash to the // connected peer. An error is returned if the transaction hash is not known. func (s *server) pushTxMsg(sp *serverPeer, sha *wire.ShaHash, doneChan chan<- struct{}, waitChan <-chan struct{}) error { @@ -1871,8 +1895,8 @@ func (s *server) BanPeer(sp *serverPeer) { s.banPeers <- sp } -// RelayInventory relays the passed inventory to all connected peers that are -// not already known to have it. +// RelayInventory relays the passed inventory vector to all connected peers +// that are not already known to have it. func (s *server) RelayInventory(invVect *wire.InvVect, data interface{}) { s.relayInv <- relayMsg{invVect: invVect, data: data} } @@ -2008,45 +2032,6 @@ func (s *server) UpdatePeerHeights(latestBlkSha *wire.ShaHash, latestHeight int3 } } -// relayTransactionsHandler relays transactions sent on relayNtfnChan to other -// peers and to the RPC infrastructure if necessary. It must be run as a -// goroutine. -func (s *server) relayTransactionsHandler() { -out: - for { - select { - case tx := <-s.relayNtfnChan: - // Generate an inv and relay it. - inv := wire.NewInvVect(wire.InvTypeTx, tx.Sha()) - s.RelayInventory(inv, tx) - - if s.rpcServer != nil { - // Notify websocket clients about mempool transactions. - s.rpcServer.ntfnMgr.NotifyMempoolTx(tx, true) - - // Potentially notify any getblocktemplate long poll clients - // about stale block templates due to the new transaction. - s.rpcServer.gbtWorkState.NotifyMempoolTx( - s.txMemPool.LastUpdated()) - } - - case <-s.quit: - break out - } - } - - // Drain channels before exiting so nothing is left waiting around to send. -cleanup: - for { - select { - case <-s.relayNtfnChan: - default: - break cleanup - } - } - s.wg.Done() -} - // rebroadcastHandler keeps track of user submitted inventories that we have // sent out but have not yet made it into a block. We periodically rebroadcast // them in case our peers restarted or otherwise lost track of them. @@ -2126,9 +2111,6 @@ func (s *server) Start() { s.wg.Add(1) go s.peerHandler() - s.wg.Add(1) - go s.relayTransactionsHandler() - if s.nat != nil { s.wg.Add(1) go s.upnpUpdateThread() @@ -2480,7 +2462,6 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param relayInv: make(chan relayMsg, cfg.MaxPeers), broadcast: make(chan broadcastMsg, cfg.MaxPeers), quit: make(chan struct{}), - relayNtfnChan: make(chan *btcutil.Tx, cfg.MaxPeers), modifyRebroadcastInv: make(chan interface{}), peerHeightsUpdate: make(chan updatePeerHeightsMsg), nat: nat, @@ -2539,7 +2520,6 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param }, FetchUtxoView: s.blockManager.chain.FetchUtxoView, Chain: s.blockManager.chain, - RelayNtfnChan: s.relayNtfnChan, SigCache: s.sigCache, TimeSource: s.timeSource, AddrIndex: s.addrIndex, From 474547b211c8f5566d094478318fcfd3c2c838d4 Mon Sep 17 00:00:00 2001 From: David Hill Date: Thu, 14 Apr 2016 14:38:25 -0400 Subject: [PATCH 2/3] travis: run tests on latest golang (1.5.4 and 1.6.1) (#669) --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 8c24901c..d003fea9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: go go: - - 1.5.3 - - 1.6 + - 1.5.4 + - 1.6.1 sudo: false before_install: - gotools=golang.org/x/tools From b87723cd94ea11c29e22c4372ba4fe96886e7c83 Mon Sep 17 00:00:00 2001 From: Dave Collins Date: Wed, 20 Apr 2016 23:58:31 -0500 Subject: [PATCH 3/3] btcd: Remove peer-specific logging funcs. (#675) This removes the logging functions that are now implemented in the peer package as they are no longer used by btcd itself and should have been removed when they were copied into the peer package. --- log.go | 153 --------------------------------------------------------- 1 file changed, 153 deletions(-) diff --git a/log.go b/log.go index b98197b7..f249927a 100644 --- a/log.go +++ b/log.go @@ -7,8 +7,6 @@ package main import ( "fmt" "os" - "strings" - "time" "github.com/btcsuite/btcd/addrmgr" "github.com/btcsuite/btcd/blockchain" @@ -16,7 +14,6 @@ import ( "github.com/btcsuite/btcd/database" "github.com/btcsuite/btcd/peer" "github.com/btcsuite/btcd/txscript" - "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btclog" "github.com/btcsuite/seelog" ) @@ -210,153 +207,3 @@ func directionString(inbound bool) string { } return "outbound" } - -// formatLockTime returns a transaction lock time as a human-readable string. -func formatLockTime(lockTime uint32) string { - // The lock time field of a transaction is either a block height at - // which the transaction is finalized or a timestamp depending on if the - // value is before the txscript.LockTimeThreshold. When it is under the - // threshold it is a block height. - if lockTime < txscript.LockTimeThreshold { - return fmt.Sprintf("height %d", lockTime) - } - - return time.Unix(int64(lockTime), 0).String() -} - -// invSummary returns an inventory message as a human-readable string. -func invSummary(invList []*wire.InvVect) string { - // No inventory. - invLen := len(invList) - if invLen == 0 { - return "empty" - } - - // One inventory item. - if invLen == 1 { - iv := invList[0] - switch iv.Type { - case wire.InvTypeError: - return fmt.Sprintf("error %s", iv.Hash) - case wire.InvTypeBlock: - return fmt.Sprintf("block %s", iv.Hash) - case wire.InvTypeTx: - return fmt.Sprintf("tx %s", iv.Hash) - } - - return fmt.Sprintf("unknown (%d) %s", uint32(iv.Type), iv.Hash) - } - - // More than one inv item. - return fmt.Sprintf("size %d", invLen) -} - -// locatorSummary returns a block locator as a human-readable string. -func locatorSummary(locator []*wire.ShaHash, stopHash *wire.ShaHash) string { - if len(locator) > 0 { - return fmt.Sprintf("locator %s, stop %s", locator[0], stopHash) - } - - return fmt.Sprintf("no locator, stop %s", stopHash) - -} - -// sanitizeString strips any characters which are even remotely dangerous, such -// as html control characters, from the passed string. It also limits it to -// the passed maximum size, which can be 0 for unlimited. When the string is -// limited, it will also add "..." to the string to indicate it was truncated. -func sanitizeString(str string, maxLength uint) string { - const safeChars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXY" + - "Z01234567890 .,;_/:?@" - - // Strip any characters not in the safeChars string removed. - str = strings.Map(func(r rune) rune { - if strings.IndexRune(safeChars, r) >= 0 { - return r - } - return -1 - }, str) - - // Limit the string to the max allowed length. - if maxLength > 0 && uint(len(str)) > maxLength { - str = str[:maxLength] - str = str + "..." - } - return str -} - -// messageSummary returns a human-readable string which summarizes a message. -// Not all messages have or need a summary. This is used for debug logging. -func messageSummary(msg wire.Message) string { - switch msg := msg.(type) { - case *wire.MsgVersion: - return fmt.Sprintf("agent %s, pver %d, block %d", - msg.UserAgent, msg.ProtocolVersion, msg.LastBlock) - - case *wire.MsgVerAck: - // No summary. - - case *wire.MsgGetAddr: - // No summary. - - case *wire.MsgAddr: - return fmt.Sprintf("%d addr", len(msg.AddrList)) - - case *wire.MsgPing: - // No summary - perhaps add nonce. - - case *wire.MsgPong: - // No summary - perhaps add nonce. - - case *wire.MsgAlert: - // No summary. - - case *wire.MsgMemPool: - // No summary. - - case *wire.MsgTx: - return fmt.Sprintf("hash %s, %d inputs, %d outputs, lock %s", - msg.TxSha(), len(msg.TxIn), len(msg.TxOut), - formatLockTime(msg.LockTime)) - - case *wire.MsgBlock: - header := &msg.Header - return fmt.Sprintf("hash %s, ver %d, %d tx, %s", msg.BlockSha(), - header.Version, len(msg.Transactions), header.Timestamp) - - case *wire.MsgInv: - return invSummary(msg.InvList) - - case *wire.MsgNotFound: - return invSummary(msg.InvList) - - case *wire.MsgGetData: - return invSummary(msg.InvList) - - case *wire.MsgGetBlocks: - return locatorSummary(msg.BlockLocatorHashes, &msg.HashStop) - - case *wire.MsgGetHeaders: - return locatorSummary(msg.BlockLocatorHashes, &msg.HashStop) - - case *wire.MsgHeaders: - return fmt.Sprintf("num %d", len(msg.Headers)) - - case *wire.MsgReject: - // Ensure the variable length strings don't contain any - // characters which are even remotely dangerous such as HTML - // control characters, etc. Also limit them to sane length for - // logging. - rejCommand := sanitizeString(msg.Cmd, wire.CommandSize) - rejReason := sanitizeString(msg.Reason, maxRejectReasonLen) - summary := fmt.Sprintf("cmd %v, code %v, reason %v", rejCommand, - msg.Code, rejReason) - if rejCommand == wire.CmdBlock || rejCommand == wire.CmdTx { - summary += fmt.Sprintf(", hash %v", msg.Hash) - } - return summary - } - - // No summary for other messages. - return "" -}