diff --git a/mempool.go b/mempool.go index eec359ea..9b45adda 100644 --- a/mempool.go +++ b/mempool.go @@ -18,6 +18,7 @@ import ( "github.com/decred/dcrd/blockchain" "github.com/decred/dcrd/blockchain/stake" + "github.com/decred/dcrd/chaincfg" "github.com/decred/dcrd/chaincfg/chainhash" "github.com/decred/dcrd/database" "github.com/decred/dcrd/txscript" @@ -91,6 +92,51 @@ type mempoolTxDesc struct { StartingPriority float64 } +// mempoolConfig is a descriptor containing the memory pool configuration. +type mempoolConfig struct { + // ChainParams identifies which chain parameters the mempool is + // associated with. + ChainParams *chaincfg.Params + + // DisableRelayPriority defines whether to relay free or low-fee + // transactions that do not have enough priority to be relayed. + DisableRelayPriority bool + + // EnableAddrIndex defines whether the address index should be enabled. + EnableAddrIndex bool + + // FetchTransactionStore defines the function to use to fetch + // transacation information. + FetchTransactionStore func(*dcrutil.Tx, bool, bool) (blockchain.TxStore, error) + + // FreeTxRelayLimit defines the given amount in thousands of bytes + // per minute that transactions with no fee are rate limited to. + FreeTxRelayLimit float64 + + // MaxOrphanTxs defines the maximum number of orphan transactions to + // keep in memory. + MaxOrphanTxs int + + // MinRelayTxFee defines the minimum transaction fee in BTC/kB to be + // considered a non-zero fee. + MinRelayTxFee dcrutil.Amount + + // NewestSha defines the function to retrieve the newest sha. + NewestSha func() (*chainhash.Hash, int64, error) + + // NextStakeDifficulty defines the function to retrieve the stake + // difficulty for the block after the current best block. + // + // This function must be safe for concurrent access. + NextStakeDifficulty func() (int64, error) + + // SigCache defines a signature cache to use. + SigCache *txscript.SigCache + + // TimeSource defines the timesource to use. + TimeSource blockchain.MedianTimeSource +} + // txMemPool is used as a source of transactions that need to be mined into // blocks and relayed to other peers. It is safe for concurrent access from // multiple peers. @@ -99,7 +145,7 @@ type txMemPool struct { lastUpdated int64 // last time pool was updated. sync.RWMutex - server *server + cfg mempoolConfig pool map[chainhash.Hash]*mempoolTxDesc orphans map[chainhash.Hash]*dcrutil.Tx orphansByPrev map[chainhash.Hash]map[chainhash.Hash]*dcrutil.Tx @@ -140,8 +186,8 @@ func (mp *txMemPool) insertVote(ssgen *dcrutil.Tx) error { "voting %v on the transaction tree", voteHash, blockHash, blockHeight, vote) - slice := make([]*VoteTx, int(mp.server.chainParams.TicketsPerBlock), - int(mp.server.chainParams.TicketsPerBlock)) + slice := make([]*VoteTx, int(mp.cfg.ChainParams.TicketsPerBlock), + int(mp.cfg.ChainParams.TicketsPerBlock)) slice[0] = voteTx mp.votes[blockHash] = slice return nil @@ -291,7 +337,7 @@ func (mp *txMemPool) sortParentsByVotes(currentTopBlock chainhash.Hash, sort.Sort(sort.Reverse(ByNumberOfVotes(bwlvs))) var sortedUsefulBlocks []chainhash.Hash - minimumVotesRequired := uint16((mp.server.chainParams.TicketsPerBlock / 2) + 1) + minimumVotesRequired := uint16((mp.cfg.ChainParams.TicketsPerBlock / 2) + 1) for _, bwlv := range bwlvs { if bwlv.Votes >= minimumVotesRequired { sortedUsefulBlocks = append(sortedUsefulBlocks, bwlv.Block) @@ -401,7 +447,7 @@ func (mp *txMemPool) RemoveOrphan(txHash *chainhash.Hash) { // // This function MUST be called with the mempool lock held (for writes). func (mp *txMemPool) limitNumOrphans() error { - if len(mp.orphans)+1 > cfg.MaxOrphanTxs && cfg.MaxOrphanTxs > 0 { + if len(mp.orphans)+1 > mp.cfg.MaxOrphanTxs && mp.cfg.MaxOrphanTxs > 0 { // Generate a cryptographically random hash. randHashBytes := make([]byte, chainhash.HashSize) _, err := rand.Read(randHashBytes) @@ -467,7 +513,7 @@ func (mp *txMemPool) maybeAddOrphan(tx *dcrutil.Tx) error { // // Note that the number of orphan transactions in the orphan pool is // also limited, so this equates to a maximum memory used of - // maxOrphanTxSize * cfg.MaxOrphanTxs (which is ~5MB using the default + // maxOrphanTxSize * mp.cfg.MaxOrphanTxs (which is ~5MB using the default // values at the time this comment was written). serializedLen := tx.MsgTx().SerializeSize() if serializedLen > maxOrphanTxSize { @@ -602,15 +648,18 @@ func (mp *txMemPool) removeTransaction(tx *dcrutil.Tx, removeRedeemers bool) { // Remove the transaction and mark the referenced outpoints as unspent // by the pool. if txDesc, exists := mp.pool[*txHash]; exists { + // Remove the transaction and its addresses from the address + // index if it's enabled. + if mp.cfg.EnableAddrIndex { + mp.pruneTxFromAddrIndex(tx, txType) + } + for _, txIn := range txDesc.Tx.MsgTx().TxIn { delete(mp.outpoints, txIn.PreviousOutPoint) } delete(mp.pool, *txHash) atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix()) } - - // Remove the transaction and its addresses from the address index. - mp.pruneTxFromAddrIndex(tx, txType) } // RemoveTransaction removes the passed transaction from the mempool. If @@ -672,6 +721,32 @@ func (mp *txMemPool) addTransaction(txStore blockchain.TxStore, tx *dcrutil.Tx, mp.outpoints[txIn.PreviousOutPoint] = tx } atomic.StoreInt64(&mp.lastUpdated, time.Now().Unix()) + + // Add the addresses associated with the transaction to the address + // index if it's enabled. + if mp.cfg.EnableAddrIndex { + mp.addTransactionToAddrIndex(tx, txType) + } +} + +// addTransactionToAddrIndex adds all addresses related to the transaction to +// our in-memory address index. Note that this address is only populated when +// we're running with the optional address index activated. +// +// This function MUST be called with the mempool lock held (for writes). +func (mp *txMemPool) addTransactionToAddrIndex(tx *dcrutil.Tx, + txType stake.TxType) error { + + // Insert the addresses into the mempool address index. + for _, txOut := range tx.MsgTx().TxOut { + err := mp.indexScriptAddressToTx(txOut.Version, txOut.PkScript, + tx, txType) + if err != nil { + return err + } + } + + return nil } // fetchReferencedOutputScripts looks up and returns all the scriptPubKeys @@ -715,7 +790,7 @@ func (mp *txMemPool) indexScriptAddressToTx(pkVersion uint16, pkScript []byte, // An exception is SStx commitments. Handle these manually. if txType == stake.TxTypeSStx && class == txscript.NullDataTy { addr, err := stake.AddrFromSStxPkScrCommitment(pkScript, - mp.server.chainParams) + mp.cfg.ChainParams) if err != nil { txmpLog.Tracef("Unable to extract encoded addresses "+ "from sstx commitment script for addrindex: %v", err) @@ -754,7 +829,7 @@ func (mp *txMemPool) pruneTxFromAddrIndex(tx *dcrutil.Tx, txType stake.TxType) { // An exception is SStx commitments. Handle these manually. if txType == stake.TxTypeSStx && class == txscript.NullDataTy { addr, err := stake.AddrFromSStxPkScrCommitment(txOut.PkScript, - mp.server.chainParams) + mp.cfg.ChainParams) if err != nil { // If we couldn't extract addresses, skip this output. continue @@ -854,7 +929,7 @@ func (mp *txMemPool) isTxTreeValid(newestHash *chainhash.Hash) bool { // There are not possibly enough votes to tell if the txTree is valid; // assume it's valid. if len(mp.votes[*newestHash]) <= - int(mp.server.chainParams.TicketsPerBlock/2) { + int(mp.cfg.ChainParams.TicketsPerBlock/2) { return true } @@ -897,9 +972,13 @@ func (mp *txMemPool) IsTxTreeValid(best *chainhash.Hash) bool { // This function MUST be called with the mempool lock held (for reads). func (mp *txMemPool) fetchInputTransactions(tx *dcrutil.Tx, includeSpent bool) (blockchain.TxStore, error) { - tv := mp.IsTxTreeValid(mp.server.blockManager.chainState.newestHash) - txStore, err := mp.server.blockManager.blockChain.FetchTransactionStore(tx, - tv, includeSpent) + + newestHash, _, err := mp.cfg.NewestSha() + if err != nil { + return nil, err + } + tv := mp.IsTxTreeValid(newestHash) + txStore, err := mp.cfg.FetchTransactionStore(tx, tv, includeSpent) if err != nil { return nil, err } @@ -986,7 +1065,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew, // Perform preliminary sanity checks on the transaction. This makes // use of chain which contains the invariant rules for what // transactions are allowed into blocks. - err := blockchain.CheckTransactionSanity(tx, mp.server.chainParams) + err := blockchain.CheckTransactionSanity(tx, mp.cfg.ChainParams) if err != nil { if cerr, ok := err.(blockchain.RuleError); ok { return nil, chainRuleError(cerr) @@ -1014,7 +1093,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew, // Get the current height of the main chain. A standalone transaction // will be mined into the next block at best, so it's height is at least // one more than the current height. - _, curHeight, err := mp.server.db.NewestSha() + _, curHeight, err := mp.cfg.NewestSha() if err != nil { // This is an unexpected error so don't turn it into a rule // error. @@ -1036,7 +1115,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew, // forbid their relaying. if !activeNetParams.RelayNonStdTxs { err := checkTransactionStandard(tx, txType, nextBlockHeight, - mp.server.timeSource, cfg.minRelayTxFee) + mp.cfg.TimeSource, mp.cfg.MinRelayTxFee) if err != nil { // Attempt to extract a reject code from the error so // it can be retained. When not possible, fall back to @@ -1054,9 +1133,12 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew, // If the transaction is a ticket, ensure that it meets the next // stake difficulty. if txType == stake.TxTypeSStx { - mp.server.blockManager.chainState.Lock() - sDiff := mp.server.blockManager.chainState.nextStakeDifficulty - mp.server.blockManager.chainState.Unlock() + sDiff, err := mp.cfg.NextStakeDifficulty() + if err != nil { + // This is an unexpected error so don't turn it into a + // rule error. + return nil, err + } if tx.MsgTx().TxOut[0].Value < sDiff { str := fmt.Sprintf("transaction %v has not enough funds "+ @@ -1176,7 +1258,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew, nextBlockHeight, txStore, false, // Don't check fraud proof; filled in by miner - mp.server.chainParams) + mp.cfg.ChainParams) if err != nil { if cerr, ok := err.(blockchain.RuleError); ok { return nil, chainRuleError(cerr) @@ -1240,7 +1322,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew, // high-priority transactions, don't require a fee for it. // This applies to non-stake transactions only. serializedSize := int64(tx.MsgTx().SerializeSize()) - minFee := calcMinRequiredTxRelayFee(serializedSize, cfg.minRelayTxFee) + minFee := calcMinRequiredTxRelayFee(serializedSize, mp.cfg.MinRelayTxFee) if txType == stake.TxTypeRegular { // Non-stake only if serializedSize >= (defaultBlockPrioritySize-1000) && txFee < minFee { str := fmt.Sprintf("transaction %v has %v fees which is under "+ @@ -1254,8 +1336,9 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew, // in the next block. Transactions which are being added back to the // memory pool from blocks that have been disconnected during a reorg // are exempted. + // // This applies to non-stake transactions only. - if isNew && !cfg.NoRelayPriority && txFee < minFee && + if isNew && !mp.cfg.DisableRelayPriority && txFee < minFee && txType == stake.TxTypeRegular { currentPriority := calcPriority(tx.MsgTx(), txStore, @@ -1280,7 +1363,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew, mp.lastPennyUnix = nowUnix // Are we still over the limit? - if mp.pennyTotal >= cfg.FreeTxRelayLimit*10*1000 { + if mp.pennyTotal >= mp.cfg.FreeTxRelayLimit*10*1000 { str := fmt.Sprintf("transaction %v has been rejected "+ "by the rate limiter due to low fees", txHash) return nil, txRuleError(wire.RejectInsufficientFee, str) @@ -1290,7 +1373,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew, mp.pennyTotal += float64(serializedSize) txmpLog.Tracef("rate limit: curTotal %v, nextTotal: %v, "+ "limit %v", oldTotal, mp.pennyTotal, - cfg.FreeTxRelayLimit*10*1000) + mp.cfg.FreeTxRelayLimit*10*1000) } // Set an absolute threshold for ticket rejection and obey it. Tickets @@ -1324,7 +1407,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew, // Verify crypto signatures for each input and reject the transaction if // any don't verify. err = blockchain.ValidateTransactionScripts(tx, txStore, - txscript.StandardVerifyFlags, mp.server.sigCache) + txscript.StandardVerifyFlags, mp.cfg.SigCache) if err != nil { if cerr, ok := err.(blockchain.RuleError); ok { return nil, chainRuleError(cerr) @@ -1354,15 +1437,6 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *dcrutil.Tx, isNew, txmpLog.Debugf("Accepted transaction %v (pool size: %v)", txHash, len(mp.pool)) - if mp.server.rpcServer != nil { - // Notify websocket clients about mempool transactions. - mp.server.rpcServer.ntfnMgr.NotifyMempoolTx(tx, isNew) - - // Potentially notify any getblocktemplate long poll clients - // about stale block templates due to the new transaction. - mp.server.rpcServer.gbtWorkState.NotifyMempoolTx(mp.LastUpdated()) - } - return nil, nil } @@ -1706,9 +1780,9 @@ func (mp *txMemPool) CheckIfTxsExist(hashes []chainhash.Hash) bool { // newTxMemPool returns a new memory pool for validating and storing standalone // transactions until they are mined into a block. -func newTxMemPool(server *server) *txMemPool { +func newTxMemPool(cfg *mempoolConfig) *txMemPool { memPool := &txMemPool{ - server: server, + cfg: *cfg, pool: make(map[chainhash.Hash]*mempoolTxDesc), orphans: make(map[chainhash.Hash]*dcrutil.Tx), orphansByPrev: make(map[chainhash.Hash]map[chainhash.Hash]*dcrutil.Tx), @@ -1716,7 +1790,7 @@ func newTxMemPool(server *server) *txMemPool { votes: make(map[chainhash.Hash][]*VoteTx), } - if !cfg.NoAddrIndex { + if cfg.EnableAddrIndex { memPool.addrindex = make(map[string]map[chainhash.Hash]struct{}) } return memPool diff --git a/mining.go b/mining.go index 2cbbd8b2..01ab6464 100644 --- a/mining.go +++ b/mining.go @@ -703,9 +703,7 @@ func medianAdjustedTime(chainState *chainState, // valid from the perspective of the mainchain (not necessarily // the mempool or block) before inserting into a tx tree. // If it fails the check, it returns false; otherwise true. -func maybeInsertStakeTx(mp *txMemPool, stx *dcrutil.Tx, treeValid bool) bool { - bm := mp.server.blockManager - +func maybeInsertStakeTx(bm *blockManager, stx *dcrutil.Tx, treeValid bool) bool { missingInput := false txStore, err := bm.FetchTransactionStore(stx, treeValid) @@ -1714,7 +1712,7 @@ mempoolLoop: if isSSGen, _ := stake.IsSSGen(tx); isSSGen { txCopy := dcrutil.NewTxDeepTxIns(tx.MsgTx()) - if maybeInsertStakeTx(mempool, txCopy, treeValid) { + if maybeInsertStakeTx(blockManager, txCopy, treeValid) { vb := stake.GetSSGenVoteBits(txCopy) voteBitsVoters = append(voteBitsVoters, vb) blockTxnsStake = append(blockTxnsStake, txCopy) @@ -1820,7 +1818,7 @@ mempoolLoop: // Quick check for difficulty here. if tx.MsgTx().TxOut[0].Value >= requiredStakeDifficulty { txCopy := dcrutil.NewTxDeepTxIns(tx.MsgTx()) - if maybeInsertStakeTx(mempool, txCopy, treeValid) { + if maybeInsertStakeTx(blockManager, txCopy, treeValid) { blockTxnsStake = append(blockTxnsStake, txCopy) freshStake++ } @@ -1843,7 +1841,7 @@ mempoolLoop: isSSRtx, _ := stake.IsSSRtx(tx) if tx.Tree() == dcrutil.TxTreeStake && isSSRtx { txCopy := dcrutil.NewTxDeepTxIns(tx.MsgTx()) - if maybeInsertStakeTx(mempool, txCopy, treeValid) { + if maybeInsertStakeTx(blockManager, txCopy, treeValid) { blockTxnsStake = append(blockTxnsStake, txCopy) revocations++ } diff --git a/server.go b/server.go index d1b828a6..4431ef49 100644 --- a/server.go +++ b/server.go @@ -2494,7 +2494,32 @@ func newServer(listenAddrs []string, database database.Db, tmdb *stake.TicketDB, return nil, err } s.blockManager = bm - s.txMemPool = newTxMemPool(&s) + + txC := mempoolConfig{ + ChainParams: chainParams, + DisableRelayPriority: cfg.NoRelayPriority, + EnableAddrIndex: !cfg.NoAddrIndex, + FetchTransactionStore: s.blockManager.blockChain.FetchTransactionStore, + FreeTxRelayLimit: cfg.FreeTxRelayLimit, + MaxOrphanTxs: cfg.MaxOrphanTxs, + MinRelayTxFee: cfg.minRelayTxFee, + NewestSha: func() (*chainhash.Hash, int64, error) { + bm.chainState.Lock() + hash := bm.chainState.newestHash + height := bm.chainState.newestHeight + bm.chainState.Unlock() + return hash, height, nil + }, + NextStakeDifficulty: func() (int64, error) { + bm.chainState.Lock() + sDiff := bm.chainState.nextStakeDifficulty + bm.chainState.Unlock() + return sDiff, nil + }, + SigCache: s.sigCache, + TimeSource: s.timeSource, + } + s.txMemPool = newTxMemPool(&txC) // Create the mining policy based on the configuration options. policy := miningPolicy{