diff --git a/blockmanager.go b/blockmanager.go index 56d76da3..a676435a 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -1891,6 +1891,10 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { r.ntfnMgr.NotifyBlockConnected(block) } + if b.server.bg != nil { + b.server.bg.handleConnectedBlock(block.Height()) + } + // Stake tickets are spent or missed from the most recently connected block. case blockchain.NTSpentAndMissedTickets: tnd, ok := notification.Data.(*blockchain.TicketNotificationsData) @@ -1982,6 +1986,14 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { handleDisconnectedBlockTxns(block.Transactions()[1:]) handleDisconnectedBlockTxns(block.STransactions()) + if b.server.bg != nil { + b.server.bg.handleDisconnectedBlock(block.Height()) + } + + // Filter and update the rebroadcast inventory. + b.server.PruneRebroadcastInventory() + + // Notify registered websocket clients. if r := b.server.rpcServer; r != nil { // Filter and update the rebroadcast inventory. b.server.PruneRebroadcastInventory() @@ -1990,6 +2002,18 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) { r.ntfnMgr.NotifyBlockDisconnected(block) } + // Chain reorganization has commenced. + case blockchain.NTChainReorgStarted: + if b.server.bg != nil { + b.server.bg.handleChainReorgStarted() + } + + // Chain reorganization has concluded. + case blockchain.NTChainReorgDone: + if b.server.bg != nil { + b.server.bg.handleChainReorgDone() + } + // The blockchain is reorganizing. case blockchain.NTReorganization: rd, ok := notification.Data.(*blockchain.ReorganizationNtfnsData) diff --git a/mempool/mempool.go b/mempool/mempool.go index b12251fc..658edd2a 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -125,6 +125,10 @@ type Config struct { // whenever a transaction is removed from the mempool in order to track fee // estimation. RemoveTxFromFeeEstimation func(txHash *chainhash.Hash) + + // OnVoteReceived defines the function used to signal receiving a new + // vote in the mempool. + OnVoteReceived func(voteTx *wire.MsgTx) } // Policy houses the policy (configuration parameters) which is used to @@ -676,6 +680,9 @@ func (mp *TxPool) RemoveDoubleSpends(tx *dcrutil.Tx) { // This function MUST be called with the mempool lock held (for writes). func (mp *TxPool) addTransaction(utxoView *blockchain.UtxoViewpoint, tx *dcrutil.Tx, txType stake.TxType, height int64, fee int64) { + if mp.cfg.OnVoteReceived != nil && txType == stake.TxTypeSSGen { + mp.cfg.OnVoteReceived(tx.MsgTx()) + } // Add the transaction to the pool and mark the referenced outpoints // as spent by the pool. diff --git a/mempool/mempool_test.go b/mempool/mempool_test.go index dadf01ad..a662f9e2 100644 --- a/mempool/mempool_test.go +++ b/mempool/mempool_test.go @@ -752,6 +752,7 @@ func newPoolHarness(chainParams *chaincfg.Params) (*poolHarness, []spendableOutp SigCache: nil, AddrIndex: nil, ExistsAddrIndex: nil, + OnVoteReceived: nil, }), } diff --git a/mining.go b/mining.go index 355934dd..9359c00d 100644 --- a/mining.go +++ b/mining.go @@ -7,10 +7,14 @@ package main import ( "container/heap" + "context" "encoding/binary" "fmt" "math" + "math/rand" "sort" + "sync" + "sync/atomic" "time" "github.com/decred/dcrd/blockchain" @@ -46,6 +50,23 @@ const ( // kilobyte is the size of a kilobyte. kilobyte = 1000 + + // templateRegenSecs is the required number of seconds elapsed with + // incoming non vote transactions before template regeneration + // is required. + templateRegenSecs = 30 + + // maxTemplateRegenSecs is the maximum number of seconds elapsed + // without incoming transactions before template regeneration is required. + maxTemplateRegenSecs = 60 + + // merkleRootPairSize is the size in bytes of the merkle root + stake root + // of a block. + merkleRootPairSize = 64 + + // templateIDSize is the size in bytes of the merkle root pair + timestamp + // of a block. + templateIDSize = merkleRootPairSize + 8 // 72 bytes ) // txPrioItem houses a transaction along with extra information that allows the @@ -766,7 +787,7 @@ func handleTooFewVoters(subsidyCache *blockchain.SubsidyCache, nextHeight int64, if !best.PrevHash.IsEqual( &curTemplate.Block.Header.PrevBlock) { minrLog.Debugf("Cached mining templates are no longer current, " + - "resetting") + "resetting.") bm.SetCurrentTemplate(nil) bm.SetParentTemplate(nil) } @@ -937,42 +958,6 @@ func handleTooFewVoters(subsidyCache *blockchain.SubsidyCache, nextHeight int64, return nil, nil } -// handleCreatedBlockTemplate stores a successfully created block template to -// the appropriate cache if needed, then returns the template to the miner to -// work on. The stored template is a copy of the template, to prevent races -// from occurring in case the template is mined on by the CPUminer. -func handleCreatedBlockTemplate(blockTemplate *BlockTemplate, bm *blockManager) (*BlockTemplate, error) { - curTemplate := bm.GetCurrentTemplate() - nextBlockHeight := blockTemplate.Height - stakeValidationHeight := bm.server.chainParams.StakeValidationHeight - // This is where we begin storing block templates, when either the - // program is freshly started or the chain is matured to stake - // validation height. - if curTemplate == nil && nextBlockHeight >= stakeValidationHeight-2 { - bm.SetCurrentTemplate(blockTemplate) - } - - // We're at the height where the next block needs to include SSGens, - // so we check to if CachedCurrentTemplate is out of date. If it is, - // we store it as the cached parent template, and store the new block - // template as the currenct template. - if curTemplate != nil && nextBlockHeight >= stakeValidationHeight-1 { - if curTemplate.Height < nextBlockHeight { - bm.SetParentTemplate(curTemplate) - bm.SetCurrentTemplate(blockTemplate) - } - } - - // Overwrite the old cached block if it's out of date. - if curTemplate != nil { - if curTemplate.Height == nextBlockHeight { - bm.SetCurrentTemplate(blockTemplate) - } - } - - return blockTemplate, nil -} - // BlkTmplGenerator generates block templates based on a given mining policy // and a transactions source. It also houses additional state required in // order to ensure the templates adhere to the consensus rules and are built @@ -2053,3 +2038,375 @@ func (g *BlkTmplGenerator) UpdateBlockTime(header *wire.BlockHeader) error { return nil } + +// BgBlkTmplGenerator represents the background process that generates block +// templates and notifies all subscribed clients on template regeneration. +// It generates new templates based on the time elapsed since last template +// regeneration. +type BgBlkTmplGenerator struct { + voteChan chan *wire.MsgTx + notifyChan chan *BlockTemplate + tg *BlkTmplGenerator + txSource mining.TxSource + lastRegen int64 + subscribers map[chan *BlockTemplate]struct{} + miningAddrs []dcrutil.Address + templateMtx sync.Mutex + subscriptionMtx sync.Mutex + currentTemplate *BlockTemplate + parentTemplate *BlockTemplate + templatePool map[[templateIDSize]byte]*wire.MsgBlock + templatePoolMtx sync.RWMutex + chainReorg bool + chainReorgMtx sync.Mutex + permitConnectionlessMining bool + regenScheduler *time.Timer +} + +// newBgBlkTmplGenerator initializes a background block template generator. +func newBgBlkTmplGenerator(tg *BlkTmplGenerator, addrs []dcrutil.Address, permitConnectionlessMining bool) *BgBlkTmplGenerator { + return &BgBlkTmplGenerator{ + voteChan: make(chan *wire.MsgTx), + notifyChan: make(chan *BlockTemplate), + tg: tg, + txSource: tg.txSource, + miningAddrs: addrs, + subscribers: make(map[chan *BlockTemplate]struct{}), + templatePool: make(map[[templateIDSize]byte]*wire.MsgBlock), + permitConnectionlessMining: permitConnectionlessMining, + } +} + +// scheduleRegen schedules a template regeneration by the provided duration +// in the future. If there is an existing schedule in play the sheduled time +// reset to updated the provided duration. +func (g *BgBlkTmplGenerator) scheduleRegen(dt time.Duration) { + if g.regenScheduler != nil { + if !g.regenScheduler.Reset(dt) { + minrLog.Errorf("failed to reset the template regen scheduler") + return + } + } + + g.regenScheduler = time.AfterFunc(dt, func() { + go g.regenTemplate() + g.cancelScheduledRegen() + }) +} + +// cancelScheduledRegen terminates a scheduled template regeneration. +func (g *BgBlkTmplGenerator) cancelScheduledRegen() { + if g.regenScheduler != nil { + g.regenScheduler.Stop() + g.regenScheduler = nil + } +} + +// notifySubscribersHandler updates subscribers of newly created block +// templates. All subscribers are unsubscribed after being updated and +// required to resubscribe after a template update. +func (g *BgBlkTmplGenerator) notifySubscribersHandler(ctx context.Context) { + minrLog.Debug("Starting notify subscribers handler.") + for { + select { + case t := <-g.notifyChan: + g.subscriptionMtx.Lock() + for c := range g.subscribers { + go func() { + c <- t + close(c) + }() + + delete(g.subscribers, c) + } + g.subscriptionMtx.Unlock() + case <-ctx.Done(): + minrLog.Debug("Notify subscribers handler done.") + return + } + } +} + +// RequestTemplateUpdate subscribes a client for a block template update. The +// client is unsubscribed after being updated and required to resubscribe +// for the next block template update. +func (g *BgBlkTmplGenerator) RequestTemplateUpdate() chan *BlockTemplate { + updateChan := make(chan *BlockTemplate, 1) + g.subscriptionMtx.Lock() + g.subscribers[updateChan] = struct{}{} + g.subscriptionMtx.Unlock() + return updateChan +} + +// regenTemplate regenerates the block template. This must be run as a +// goroutine. +func (g *BgBlkTmplGenerator) regenTemplate() { + // NOTE: If the blockchain package is updated to provide more + // fine grained locking, it will be necessary to add a call to + // WaitForChain here. + + // Do not generate block templates if the chain is reorganizing. + g.chainReorgMtx.Lock() + if g.chainReorg { + g.chainReorgMtx.Unlock() + return + } + g.chainReorgMtx.Unlock() + + // Regenerate block templates on mainnet only when the chain is synced. + if !g.tg.chain.IsCurrent() && !g.permitConnectionlessMining { + return + } + + // Pick a mining address at random. + rand.Seed(time.Now().UnixNano()) + payToAddr := g.miningAddrs[rand.Intn(len(g.miningAddrs))] + + // Regenerate the block template. + template, err := g.tg.NewBlockTemplate(payToAddr) + if err != nil { + minrLog.Debugf("block template generation failed: %v", err) + return + } + + msgBlock := *template.Block + + // In order to efficiently store the variations of block templates that + // have been provided to callers, save a pointer to the block keyed by + // the merkle root + stake root + timestamp. This along with the data + // that is included in a work submission is used to rebuild the block + // before checking the submitted solution. + var templateID [templateIDSize]byte + copy(templateID[:chainhash.HashSize], msgBlock.Header.MerkleRoot[:]) + copy(templateID[chainhash.HashSize:], msgBlock.Header.StakeRoot[:]) + timestampNano := make([]byte, 8) + binary.LittleEndian.PutUint64(timestampNano, + uint64(msgBlock.Header.Timestamp.UnixNano())) + copy(templateID[merkleRootPairSize:], timestampNano) + + g.templatePoolMtx.Lock() + g.templatePool[templateID] = &msgBlock + g.templatePoolMtx.Unlock() + + t := *template + g.notifyChan <- &t + + g.templateMtx.Lock() + g.currentTemplate = template + g.templateMtx.Unlock() + + // Update the last template regen time. + atomic.StoreInt64(&g.lastRegen, time.Now().Unix()) +} + +// pruneOldBlockTemplates prunes all block templates with heights lower than +// the height of the best block's parent from the template pool. +func (g *BgBlkTmplGenerator) pruneOldBlockTemplates(bestHeight int64) { + g.templatePoolMtx.Lock() + for rootHash, block := range g.templatePool { + height := int64(block.Header.Height) + if height < bestHeight-1 { + delete(g.templatePool, rootHash) + } + } + g.templatePoolMtx.Unlock() +} + +// updateParentTemplate updates the parent template with the the current +// block template. This must be called when a new block has been connected. +func (g *BgBlkTmplGenerator) updateParentTemplate() { + stakeValidationHeight := g.tg.chainParams.StakeValidationHeight + g.templateMtx.Lock() + if g.currentTemplate != nil { + height := g.currentTemplate.Height + + // Set the parent template if the chain is matured to SVH-1. + if height >= stakeValidationHeight-2 { + g.parentTemplate = g.currentTemplate + minrLog.Debugf("Parent template updated to height: %v", + g.parentTemplate.Height) + } + g.currentTemplate = nil + + // Prune invalidated block templates. + g.pruneOldBlockTemplates(height) + } + g.templateMtx.Unlock() +} + +// handleChainReorgStarted updates the chain reorg state of the background +// template generator when a chain reorganization commences. +func (g *BgBlkTmplGenerator) handleChainReorgStarted() { + g.chainReorgMtx.Lock() + g.chainReorg = true + g.chainReorgMtx.Unlock() +} + +// handleChainReorgDone updates the chain reorg state of the background +// template generator and immdiately regenerates the block template +// when a chain reorganization concludes. +func (g *BgBlkTmplGenerator) handleChainReorgDone() { + g.chainReorgMtx.Lock() + g.chainReorg = false + g.chainReorgMtx.Unlock() + + // Regenerate block template. + go g.regenTemplate() +} + +// handleDisconnectedBlock empties the template pool and updates the cached +// templates based on the disconnected block's height. +func (g *BgBlkTmplGenerator) handleDisconnectedBlock(discHeight int64) { + // Remove all block templates in the template pool. + g.templatePoolMtx.Lock() + for rootHash := range g.templatePool { + delete(g.templatePool, rootHash) + } + g.templatePoolMtx.Unlock() + + g.templateMtx.Lock() + // Unset the current template. + g.currentTemplate = nil + + // Unset the parent template if it shares the same height as the + // disconnected block. + if g.parentTemplate != nil && g.parentTemplate.Height == discHeight { + g.parentTemplate = nil + } + g.templateMtx.Unlock() +} + +// handleConnectedBlock updates the parent template and generates a new block +// template if the connected block height is below SVH-1. +func (g *BgBlkTmplGenerator) handleConnectedBlock(connHeight int64) { + // Update the cached parent template. + g.updateParentTemplate() + + // Generate a new block template if the connected block height + // is below SVH-1. + if connHeight <= g.tg.chainParams.StakeValidationHeight-2 { + go g.regenTemplate() + } +} + +// onVoteReceivedHandler triggers block template regeneration based on +// votes received by the mempool. This must be run as a goroutine. +func (g *BgBlkTmplGenerator) onVoteReceivedHandler(ctx context.Context) { + minrLog.Debug("Starting vote received handler.") + for { + select { + case voteTx := <-g.voteChan: + // Fetch the block height and hash of the block being voted on by + // the new vote received. + votedOnHash, votedOnHeight := stake.SSGenBlockVotedOn(voteTx) + best := g.tg.chain.BestSnapshot() + + if votedOnHash.IsEqual(&best.Hash) && + votedOnHeight == uint32(best.Height) { + currTipVotes := + len(g.tg.txSource.VoteHashesForBlock(&best.Hash)) + + // Regenerate the block template immediately if the block being + // voted on has the maximum number of votes possible. + if currTipVotes == int(g.tg.chainParams.TicketsPerBlock) { + g.cancelScheduledRegen() + g.regenTemplate() + } + + // Schedule a block template regeneration if the new vote received + // is voting on the current chain tip with at least the minimum + // required number of votes by the network. + if currTipVotes >= + int((g.tg.chainParams.TicketsPerBlock/2)+1) { + g.scheduleRegen(time.Second) + } + } + + if !votedOnHash.IsEqual(&best.Hash) && + votedOnHeight != uint32(best.Height) { + currTipVotes := + len(g.tg.txSource.VoteHashesForBlock(&best.Hash)) + votedOnVotes := + len(g.tg.txSource.VoteHashesForBlock(&votedOnHash)) + if votedOnVotes > currTipVotes { + // Regenerate the block template immediately if the block being + // voted on has the maximum number of votes possible. + if votedOnVotes == int(g.tg.chainParams.TicketsPerBlock) { + g.cancelScheduledRegen() + g.regenTemplate() + } + + // Schdule a block template if the new vote received is voting + // on a side chain tip with at least the minimum required number + // of votes by the network and more votes than the current chain + // tip. + if votedOnVotes >= + int((g.tg.chainParams.TicketsPerBlock/2)+1) { + g.scheduleRegen(time.Second) + } + } + } + + case <-ctx.Done(): + minrLog.Debug("Vote received handler done.") + return + } + } +} + +// OnVoteReceived signals a new vote received by the mempool. +func (g *BgBlkTmplGenerator) OnVoteReceived(tx *wire.MsgTx) { + g.voteChan <- tx +} + +// generator generates new templates based on mempool activity for vote +// and non-vote transactions and the time elapsed since the last template +// regeneration. This must be run as a goroutine. +func (g *BgBlkTmplGenerator) generator(ctx context.Context) { + var isTicking bool + ticker := time.NewTicker(time.Millisecond * 500) + defer ticker.Stop() + + // Immediately generate a block template if the chain is below SVH-1. + if g.tg.chain.BestSnapshot().Height <= + g.tg.chainParams.StakeValidationHeight-2 { + g.regenTemplate() + } + + for { + select { + case <-ticker.C: + lastRegen := atomic.LoadInt64(&g.lastRegen) + lastUpdated := g.tg.txSource.LastUpdated().Unix() + now := time.Now().Unix() + sinceLastRegen := now - lastRegen + + if !isTicking { + atomic.StoreInt64(&g.lastRegen, now) + sinceLastRegen = 0 + isTicking = true + } + + // Regenerate the block template if the mempool was updated after + // last template regeneration and 30 seconds has elapsed or 60 + // seconds has elapsed since the last template regeneration. + if (lastUpdated > lastRegen && sinceLastRegen > + templateRegenSecs) || sinceLastRegen > maxTemplateRegenSecs { + g.regenTemplate() + } + + case <-ctx.Done(): + minrLog.Debug("Background block template generator done") + return + } + } +} + +// Start runs the background block template generator as a goroutine using +// the provided context. +func (g *BgBlkTmplGenerator) Start(ctx context.Context) { + minrLog.Trace("Starting background block template generator") + go g.generator(ctx) + go g.notifySubscribersHandler(ctx) + go g.onVoteReceivedHandler(ctx) +} diff --git a/rpcserver.go b/rpcserver.go index b348a3d6..43806e6e 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -99,9 +99,6 @@ const ( // in the memory pool. gbtRegenerateSeconds = 60 - // merkleRootPairSize - merkleRootPairSize = 64 - // sstxCommitmentString is the string to insert when a verbose // transaction output's pkscript type is a ticket commitment. sstxCommitmentString = "sstxcommitment" diff --git a/rpcserver_test.go b/rpcserver_test.go index 52615c02..61322af8 100644 --- a/rpcserver_test.go +++ b/rpcserver_test.go @@ -39,7 +39,7 @@ func testGetBestBlock(r *rpctest.Harness, t *testing.T) { // Hash should be the same as the newly submitted block. if !bytes.Equal(bestHash[:], generatedBlockHashes[0][:]) { t.Fatalf("Block hashes do not match. Returned hash %v, wanted "+ - "hash %v", bestHash, generatedBlockHashes[0][:]) + "hash %v", bestHash, generatedBlockHashes[0]) } // Block height should now reflect newest height. @@ -91,7 +91,7 @@ func testGetBlockHash(r *rpctest.Harness, t *testing.T) { // Block hashes should match newly created block. if !bytes.Equal(generatedBlockHashes[0][:], blockHash[:]) { t.Fatalf("Block hashes do not match. Returned hash %v, wanted "+ - "hash %v", blockHash, generatedBlockHashes[0][:]) + "hash %v", blockHash, generatedBlockHashes[0]) } } diff --git a/server.go b/server.go index e56b0f69..7be356f4 100644 --- a/server.go +++ b/server.go @@ -6,6 +6,7 @@ package main import ( + "context" "crypto/rand" "encoding/binary" "errors" @@ -193,6 +194,7 @@ type server struct { sigCache *txscript.SigCache rpcServer *rpcServer blockManager *blockManager + bg *BgBlkTmplGenerator txMemPool *mempool.TxPool feeEstimator *fees.Estimator cpuMiner *CPUMiner @@ -210,6 +212,8 @@ type server struct { db database.DB timeSource blockchain.MedianTimeSource services wire.ServiceFlag + context context.Context + cancel context.CancelFunc // The following fields are used for optional indexes. They will be nil // if the associated index is not enabled. These fields are set during @@ -2128,6 +2132,12 @@ func (s *server) Start() { s.rpcServer.Start() } + // Start the background block template generator if the config provides + // a mining address. + if len(cfg.MiningAddrs) > 0 { + s.bg.Start(s.context) + } + // Start the CPU miner if generation is enabled. if cfg.Generate { s.cpuMiner.Start() @@ -2145,6 +2155,12 @@ func (s *server) Stop() error { srvrLog.Warnf("Server shutting down") + // Stop the background block template generator. + if len(cfg.miningAddrs) > 0 { + minrLog.Info("Shutting down background block template generator.") + s.cancel() + } + // Stop the CPU miner if needed. if cfg.Generate && s.cpuMiner != nil { s.cpuMiner.Stop() @@ -2471,6 +2487,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param } } + ctx, cancel := context.WithCancel(context.Background()) s := server{ chainParams: chainParams, addrManager: amgr, @@ -2488,6 +2505,8 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param timeSource: blockchain.NewMedianTime(), services: services, sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize), + context: ctx, + cancel: cancel, } // Create the transaction and address indexes if needed. @@ -2592,6 +2611,11 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param ExistsAddrIndex: s.existsAddrIndex, AddTxToFeeEstimation: s.feeEstimator.AddMemPoolTransaction, RemoveTxFromFeeEstimation: s.feeEstimator.RemoveMemPoolTransaction, + OnVoteReceived: func(voteTx *wire.MsgTx) { + if s.bg != nil { + s.bg.OnVoteReceived(voteTx) + } + }, } s.txMemPool = mempool.New(&txC) @@ -2606,12 +2630,19 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param BlockPrioritySize: cfg.BlockPrioritySize, TxMinFreeFee: cfg.minRelayTxFee, } - blockTemplateGenerator := newBlkTmplGenerator(&policy, s.txMemPool, - s.timeSource, s.sigCache, s.chainParams, bm.chain, bm) + tg := newBlkTmplGenerator(&policy, s.txMemPool, s.timeSource, s.sigCache, + s.chainParams, bm.chain, bm) + + // Create the background block template generator if the config has a + // mining address. + if len(cfg.miningAddrs) > 0 { + s.bg = newBgBlkTmplGenerator(tg, cfg.miningAddrs, cfg.SimNet) + } + s.cpuMiner = newCPUMiner(&cpuminerConfig{ ChainParams: s.chainParams, PermitConnectionlessMining: cfg.SimNet, - BlockTemplateGenerator: blockTemplateGenerator, + BlockTemplateGenerator: tg, MiningAddrs: cfg.miningAddrs, ProcessBlock: bm.ProcessBlock, ConnectedCount: s.ConnectedCount, @@ -2701,8 +2732,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param } if !cfg.DisableRPC { - s.rpcServer, err = newRPCServer(cfg.RPCListeners, - blockTemplateGenerator, &s) + s.rpcServer, err = newRPCServer(cfg.RPCListeners, tg, &s) if err != nil { return nil, err }