multi: add BgBlkTmplGenerator.

BgBlkTmplGenerator represents the background process that
generates block templates and notifies all subscribed clients
on template regeneration. It generates new templates based
on mempool activity for vote and non-vote transactions and
the time elapsed since last template regeneration.

This also adds a template pool to the background block
generator for recreating submitted blocks.
This commit is contained in:
Donald Adu-Poku 2019-03-30 18:51:28 +00:00 committed by Dave Collins
parent c1cf192f03
commit 8f5019e083
7 changed files with 463 additions and 47 deletions

View File

@ -1891,6 +1891,10 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
r.ntfnMgr.NotifyBlockConnected(block)
}
if b.server.bg != nil {
b.server.bg.handleConnectedBlock(block.Height())
}
// Stake tickets are spent or missed from the most recently connected block.
case blockchain.NTSpentAndMissedTickets:
tnd, ok := notification.Data.(*blockchain.TicketNotificationsData)
@ -1982,6 +1986,14 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
handleDisconnectedBlockTxns(block.Transactions()[1:])
handleDisconnectedBlockTxns(block.STransactions())
if b.server.bg != nil {
b.server.bg.handleDisconnectedBlock(block.Height())
}
// Filter and update the rebroadcast inventory.
b.server.PruneRebroadcastInventory()
// Notify registered websocket clients.
if r := b.server.rpcServer; r != nil {
// Filter and update the rebroadcast inventory.
b.server.PruneRebroadcastInventory()
@ -1990,6 +2002,18 @@ func (b *blockManager) handleNotifyMsg(notification *blockchain.Notification) {
r.ntfnMgr.NotifyBlockDisconnected(block)
}
// Chain reorganization has commenced.
case blockchain.NTChainReorgStarted:
if b.server.bg != nil {
b.server.bg.handleChainReorgStarted()
}
// Chain reorganization has concluded.
case blockchain.NTChainReorgDone:
if b.server.bg != nil {
b.server.bg.handleChainReorgDone()
}
// The blockchain is reorganizing.
case blockchain.NTReorganization:
rd, ok := notification.Data.(*blockchain.ReorganizationNtfnsData)

View File

@ -125,6 +125,10 @@ type Config struct {
// whenever a transaction is removed from the mempool in order to track fee
// estimation.
RemoveTxFromFeeEstimation func(txHash *chainhash.Hash)
// OnVoteReceived defines the function used to signal receiving a new
// vote in the mempool.
OnVoteReceived func(voteTx *wire.MsgTx)
}
// Policy houses the policy (configuration parameters) which is used to
@ -676,6 +680,9 @@ func (mp *TxPool) RemoveDoubleSpends(tx *dcrutil.Tx) {
// This function MUST be called with the mempool lock held (for writes).
func (mp *TxPool) addTransaction(utxoView *blockchain.UtxoViewpoint,
tx *dcrutil.Tx, txType stake.TxType, height int64, fee int64) {
if mp.cfg.OnVoteReceived != nil && txType == stake.TxTypeSSGen {
mp.cfg.OnVoteReceived(tx.MsgTx())
}
// Add the transaction to the pool and mark the referenced outpoints
// as spent by the pool.

View File

@ -752,6 +752,7 @@ func newPoolHarness(chainParams *chaincfg.Params) (*poolHarness, []spendableOutp
SigCache: nil,
AddrIndex: nil,
ExistsAddrIndex: nil,
OnVoteReceived: nil,
}),
}

431
mining.go
View File

@ -7,10 +7,14 @@ package main
import (
"container/heap"
"context"
"encoding/binary"
"fmt"
"math"
"math/rand"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/decred/dcrd/blockchain"
@ -46,6 +50,23 @@ const (
// kilobyte is the size of a kilobyte.
kilobyte = 1000
// templateRegenSecs is the required number of seconds elapsed with
// incoming non vote transactions before template regeneration
// is required.
templateRegenSecs = 30
// maxTemplateRegenSecs is the maximum number of seconds elapsed
// without incoming transactions before template regeneration is required.
maxTemplateRegenSecs = 60
// merkleRootPairSize is the size in bytes of the merkle root + stake root
// of a block.
merkleRootPairSize = 64
// templateIDSize is the size in bytes of the merkle root pair + timestamp
// of a block.
templateIDSize = merkleRootPairSize + 8 // 72 bytes
)
// txPrioItem houses a transaction along with extra information that allows the
@ -766,7 +787,7 @@ func handleTooFewVoters(subsidyCache *blockchain.SubsidyCache, nextHeight int64,
if !best.PrevHash.IsEqual(
&curTemplate.Block.Header.PrevBlock) {
minrLog.Debugf("Cached mining templates are no longer current, " +
"resetting")
"resetting.")
bm.SetCurrentTemplate(nil)
bm.SetParentTemplate(nil)
}
@ -937,42 +958,6 @@ func handleTooFewVoters(subsidyCache *blockchain.SubsidyCache, nextHeight int64,
return nil, nil
}
// handleCreatedBlockTemplate stores a successfully created block template to
// the appropriate cache if needed, then returns the template to the miner to
// work on. The stored template is a copy of the template, to prevent races
// from occurring in case the template is mined on by the CPUminer.
func handleCreatedBlockTemplate(blockTemplate *BlockTemplate, bm *blockManager) (*BlockTemplate, error) {
curTemplate := bm.GetCurrentTemplate()
nextBlockHeight := blockTemplate.Height
stakeValidationHeight := bm.server.chainParams.StakeValidationHeight
// This is where we begin storing block templates, when either the
// program is freshly started or the chain is matured to stake
// validation height.
if curTemplate == nil && nextBlockHeight >= stakeValidationHeight-2 {
bm.SetCurrentTemplate(blockTemplate)
}
// We're at the height where the next block needs to include SSGens,
// so we check to if CachedCurrentTemplate is out of date. If it is,
// we store it as the cached parent template, and store the new block
// template as the currenct template.
if curTemplate != nil && nextBlockHeight >= stakeValidationHeight-1 {
if curTemplate.Height < nextBlockHeight {
bm.SetParentTemplate(curTemplate)
bm.SetCurrentTemplate(blockTemplate)
}
}
// Overwrite the old cached block if it's out of date.
if curTemplate != nil {
if curTemplate.Height == nextBlockHeight {
bm.SetCurrentTemplate(blockTemplate)
}
}
return blockTemplate, nil
}
// BlkTmplGenerator generates block templates based on a given mining policy
// and a transactions source. It also houses additional state required in
// order to ensure the templates adhere to the consensus rules and are built
@ -2053,3 +2038,375 @@ func (g *BlkTmplGenerator) UpdateBlockTime(header *wire.BlockHeader) error {
return nil
}
// BgBlkTmplGenerator represents the background process that generates block
// templates and notifies all subscribed clients on template regeneration.
// It generates new templates based on the time elapsed since last template
// regeneration.
type BgBlkTmplGenerator struct {
voteChan chan *wire.MsgTx
notifyChan chan *BlockTemplate
tg *BlkTmplGenerator
txSource mining.TxSource
lastRegen int64
subscribers map[chan *BlockTemplate]struct{}
miningAddrs []dcrutil.Address
templateMtx sync.Mutex
subscriptionMtx sync.Mutex
currentTemplate *BlockTemplate
parentTemplate *BlockTemplate
templatePool map[[templateIDSize]byte]*wire.MsgBlock
templatePoolMtx sync.RWMutex
chainReorg bool
chainReorgMtx sync.Mutex
permitConnectionlessMining bool
regenScheduler *time.Timer
}
// newBgBlkTmplGenerator initializes a background block template generator.
func newBgBlkTmplGenerator(tg *BlkTmplGenerator, addrs []dcrutil.Address, permitConnectionlessMining bool) *BgBlkTmplGenerator {
return &BgBlkTmplGenerator{
voteChan: make(chan *wire.MsgTx),
notifyChan: make(chan *BlockTemplate),
tg: tg,
txSource: tg.txSource,
miningAddrs: addrs,
subscribers: make(map[chan *BlockTemplate]struct{}),
templatePool: make(map[[templateIDSize]byte]*wire.MsgBlock),
permitConnectionlessMining: permitConnectionlessMining,
}
}
// scheduleRegen schedules a template regeneration by the provided duration
// in the future. If there is an existing schedule in play the sheduled time
// reset to updated the provided duration.
func (g *BgBlkTmplGenerator) scheduleRegen(dt time.Duration) {
if g.regenScheduler != nil {
if !g.regenScheduler.Reset(dt) {
minrLog.Errorf("failed to reset the template regen scheduler")
return
}
}
g.regenScheduler = time.AfterFunc(dt, func() {
go g.regenTemplate()
g.cancelScheduledRegen()
})
}
// cancelScheduledRegen terminates a scheduled template regeneration.
func (g *BgBlkTmplGenerator) cancelScheduledRegen() {
if g.regenScheduler != nil {
g.regenScheduler.Stop()
g.regenScheduler = nil
}
}
// notifySubscribersHandler updates subscribers of newly created block
// templates. All subscribers are unsubscribed after being updated and
// required to resubscribe after a template update.
func (g *BgBlkTmplGenerator) notifySubscribersHandler(ctx context.Context) {
minrLog.Debug("Starting notify subscribers handler.")
for {
select {
case t := <-g.notifyChan:
g.subscriptionMtx.Lock()
for c := range g.subscribers {
go func() {
c <- t
close(c)
}()
delete(g.subscribers, c)
}
g.subscriptionMtx.Unlock()
case <-ctx.Done():
minrLog.Debug("Notify subscribers handler done.")
return
}
}
}
// RequestTemplateUpdate subscribes a client for a block template update. The
// client is unsubscribed after being updated and required to resubscribe
// for the next block template update.
func (g *BgBlkTmplGenerator) RequestTemplateUpdate() chan *BlockTemplate {
updateChan := make(chan *BlockTemplate, 1)
g.subscriptionMtx.Lock()
g.subscribers[updateChan] = struct{}{}
g.subscriptionMtx.Unlock()
return updateChan
}
// regenTemplate regenerates the block template. This must be run as a
// goroutine.
func (g *BgBlkTmplGenerator) regenTemplate() {
// NOTE: If the blockchain package is updated to provide more
// fine grained locking, it will be necessary to add a call to
// WaitForChain here.
// Do not generate block templates if the chain is reorganizing.
g.chainReorgMtx.Lock()
if g.chainReorg {
g.chainReorgMtx.Unlock()
return
}
g.chainReorgMtx.Unlock()
// Regenerate block templates on mainnet only when the chain is synced.
if !g.tg.chain.IsCurrent() && !g.permitConnectionlessMining {
return
}
// Pick a mining address at random.
rand.Seed(time.Now().UnixNano())
payToAddr := g.miningAddrs[rand.Intn(len(g.miningAddrs))]
// Regenerate the block template.
template, err := g.tg.NewBlockTemplate(payToAddr)
if err != nil {
minrLog.Debugf("block template generation failed: %v", err)
return
}
msgBlock := *template.Block
// In order to efficiently store the variations of block templates that
// have been provided to callers, save a pointer to the block keyed by
// the merkle root + stake root + timestamp. This along with the data
// that is included in a work submission is used to rebuild the block
// before checking the submitted solution.
var templateID [templateIDSize]byte
copy(templateID[:chainhash.HashSize], msgBlock.Header.MerkleRoot[:])
copy(templateID[chainhash.HashSize:], msgBlock.Header.StakeRoot[:])
timestampNano := make([]byte, 8)
binary.LittleEndian.PutUint64(timestampNano,
uint64(msgBlock.Header.Timestamp.UnixNano()))
copy(templateID[merkleRootPairSize:], timestampNano)
g.templatePoolMtx.Lock()
g.templatePool[templateID] = &msgBlock
g.templatePoolMtx.Unlock()
t := *template
g.notifyChan <- &t
g.templateMtx.Lock()
g.currentTemplate = template
g.templateMtx.Unlock()
// Update the last template regen time.
atomic.StoreInt64(&g.lastRegen, time.Now().Unix())
}
// pruneOldBlockTemplates prunes all block templates with heights lower than
// the height of the best block's parent from the template pool.
func (g *BgBlkTmplGenerator) pruneOldBlockTemplates(bestHeight int64) {
g.templatePoolMtx.Lock()
for rootHash, block := range g.templatePool {
height := int64(block.Header.Height)
if height < bestHeight-1 {
delete(g.templatePool, rootHash)
}
}
g.templatePoolMtx.Unlock()
}
// updateParentTemplate updates the parent template with the the current
// block template. This must be called when a new block has been connected.
func (g *BgBlkTmplGenerator) updateParentTemplate() {
stakeValidationHeight := g.tg.chainParams.StakeValidationHeight
g.templateMtx.Lock()
if g.currentTemplate != nil {
height := g.currentTemplate.Height
// Set the parent template if the chain is matured to SVH-1.
if height >= stakeValidationHeight-2 {
g.parentTemplate = g.currentTemplate
minrLog.Debugf("Parent template updated to height: %v",
g.parentTemplate.Height)
}
g.currentTemplate = nil
// Prune invalidated block templates.
g.pruneOldBlockTemplates(height)
}
g.templateMtx.Unlock()
}
// handleChainReorgStarted updates the chain reorg state of the background
// template generator when a chain reorganization commences.
func (g *BgBlkTmplGenerator) handleChainReorgStarted() {
g.chainReorgMtx.Lock()
g.chainReorg = true
g.chainReorgMtx.Unlock()
}
// handleChainReorgDone updates the chain reorg state of the background
// template generator and immdiately regenerates the block template
// when a chain reorganization concludes.
func (g *BgBlkTmplGenerator) handleChainReorgDone() {
g.chainReorgMtx.Lock()
g.chainReorg = false
g.chainReorgMtx.Unlock()
// Regenerate block template.
go g.regenTemplate()
}
// handleDisconnectedBlock empties the template pool and updates the cached
// templates based on the disconnected block's height.
func (g *BgBlkTmplGenerator) handleDisconnectedBlock(discHeight int64) {
// Remove all block templates in the template pool.
g.templatePoolMtx.Lock()
for rootHash := range g.templatePool {
delete(g.templatePool, rootHash)
}
g.templatePoolMtx.Unlock()
g.templateMtx.Lock()
// Unset the current template.
g.currentTemplate = nil
// Unset the parent template if it shares the same height as the
// disconnected block.
if g.parentTemplate != nil && g.parentTemplate.Height == discHeight {
g.parentTemplate = nil
}
g.templateMtx.Unlock()
}
// handleConnectedBlock updates the parent template and generates a new block
// template if the connected block height is below SVH-1.
func (g *BgBlkTmplGenerator) handleConnectedBlock(connHeight int64) {
// Update the cached parent template.
g.updateParentTemplate()
// Generate a new block template if the connected block height
// is below SVH-1.
if connHeight <= g.tg.chainParams.StakeValidationHeight-2 {
go g.regenTemplate()
}
}
// onVoteReceivedHandler triggers block template regeneration based on
// votes received by the mempool. This must be run as a goroutine.
func (g *BgBlkTmplGenerator) onVoteReceivedHandler(ctx context.Context) {
minrLog.Debug("Starting vote received handler.")
for {
select {
case voteTx := <-g.voteChan:
// Fetch the block height and hash of the block being voted on by
// the new vote received.
votedOnHash, votedOnHeight := stake.SSGenBlockVotedOn(voteTx)
best := g.tg.chain.BestSnapshot()
if votedOnHash.IsEqual(&best.Hash) &&
votedOnHeight == uint32(best.Height) {
currTipVotes :=
len(g.tg.txSource.VoteHashesForBlock(&best.Hash))
// Regenerate the block template immediately if the block being
// voted on has the maximum number of votes possible.
if currTipVotes == int(g.tg.chainParams.TicketsPerBlock) {
g.cancelScheduledRegen()
g.regenTemplate()
}
// Schedule a block template regeneration if the new vote received
// is voting on the current chain tip with at least the minimum
// required number of votes by the network.
if currTipVotes >=
int((g.tg.chainParams.TicketsPerBlock/2)+1) {
g.scheduleRegen(time.Second)
}
}
if !votedOnHash.IsEqual(&best.Hash) &&
votedOnHeight != uint32(best.Height) {
currTipVotes :=
len(g.tg.txSource.VoteHashesForBlock(&best.Hash))
votedOnVotes :=
len(g.tg.txSource.VoteHashesForBlock(&votedOnHash))
if votedOnVotes > currTipVotes {
// Regenerate the block template immediately if the block being
// voted on has the maximum number of votes possible.
if votedOnVotes == int(g.tg.chainParams.TicketsPerBlock) {
g.cancelScheduledRegen()
g.regenTemplate()
}
// Schdule a block template if the new vote received is voting
// on a side chain tip with at least the minimum required number
// of votes by the network and more votes than the current chain
// tip.
if votedOnVotes >=
int((g.tg.chainParams.TicketsPerBlock/2)+1) {
g.scheduleRegen(time.Second)
}
}
}
case <-ctx.Done():
minrLog.Debug("Vote received handler done.")
return
}
}
}
// OnVoteReceived signals a new vote received by the mempool.
func (g *BgBlkTmplGenerator) OnVoteReceived(tx *wire.MsgTx) {
g.voteChan <- tx
}
// generator generates new templates based on mempool activity for vote
// and non-vote transactions and the time elapsed since the last template
// regeneration. This must be run as a goroutine.
func (g *BgBlkTmplGenerator) generator(ctx context.Context) {
var isTicking bool
ticker := time.NewTicker(time.Millisecond * 500)
defer ticker.Stop()
// Immediately generate a block template if the chain is below SVH-1.
if g.tg.chain.BestSnapshot().Height <=
g.tg.chainParams.StakeValidationHeight-2 {
g.regenTemplate()
}
for {
select {
case <-ticker.C:
lastRegen := atomic.LoadInt64(&g.lastRegen)
lastUpdated := g.tg.txSource.LastUpdated().Unix()
now := time.Now().Unix()
sinceLastRegen := now - lastRegen
if !isTicking {
atomic.StoreInt64(&g.lastRegen, now)
sinceLastRegen = 0
isTicking = true
}
// Regenerate the block template if the mempool was updated after
// last template regeneration and 30 seconds has elapsed or 60
// seconds has elapsed since the last template regeneration.
if (lastUpdated > lastRegen && sinceLastRegen >
templateRegenSecs) || sinceLastRegen > maxTemplateRegenSecs {
g.regenTemplate()
}
case <-ctx.Done():
minrLog.Debug("Background block template generator done")
return
}
}
}
// Start runs the background block template generator as a goroutine using
// the provided context.
func (g *BgBlkTmplGenerator) Start(ctx context.Context) {
minrLog.Trace("Starting background block template generator")
go g.generator(ctx)
go g.notifySubscribersHandler(ctx)
go g.onVoteReceivedHandler(ctx)
}

View File

@ -99,9 +99,6 @@ const (
// in the memory pool.
gbtRegenerateSeconds = 60
// merkleRootPairSize
merkleRootPairSize = 64
// sstxCommitmentString is the string to insert when a verbose
// transaction output's pkscript type is a ticket commitment.
sstxCommitmentString = "sstxcommitment"

View File

@ -39,7 +39,7 @@ func testGetBestBlock(r *rpctest.Harness, t *testing.T) {
// Hash should be the same as the newly submitted block.
if !bytes.Equal(bestHash[:], generatedBlockHashes[0][:]) {
t.Fatalf("Block hashes do not match. Returned hash %v, wanted "+
"hash %v", bestHash, generatedBlockHashes[0][:])
"hash %v", bestHash, generatedBlockHashes[0])
}
// Block height should now reflect newest height.
@ -91,7 +91,7 @@ func testGetBlockHash(r *rpctest.Harness, t *testing.T) {
// Block hashes should match newly created block.
if !bytes.Equal(generatedBlockHashes[0][:], blockHash[:]) {
t.Fatalf("Block hashes do not match. Returned hash %v, wanted "+
"hash %v", blockHash, generatedBlockHashes[0][:])
"hash %v", blockHash, generatedBlockHashes[0])
}
}

View File

@ -6,6 +6,7 @@
package main
import (
"context"
"crypto/rand"
"encoding/binary"
"errors"
@ -193,6 +194,7 @@ type server struct {
sigCache *txscript.SigCache
rpcServer *rpcServer
blockManager *blockManager
bg *BgBlkTmplGenerator
txMemPool *mempool.TxPool
feeEstimator *fees.Estimator
cpuMiner *CPUMiner
@ -210,6 +212,8 @@ type server struct {
db database.DB
timeSource blockchain.MedianTimeSource
services wire.ServiceFlag
context context.Context
cancel context.CancelFunc
// The following fields are used for optional indexes. They will be nil
// if the associated index is not enabled. These fields are set during
@ -2128,6 +2132,12 @@ func (s *server) Start() {
s.rpcServer.Start()
}
// Start the background block template generator if the config provides
// a mining address.
if len(cfg.MiningAddrs) > 0 {
s.bg.Start(s.context)
}
// Start the CPU miner if generation is enabled.
if cfg.Generate {
s.cpuMiner.Start()
@ -2145,6 +2155,12 @@ func (s *server) Stop() error {
srvrLog.Warnf("Server shutting down")
// Stop the background block template generator.
if len(cfg.miningAddrs) > 0 {
minrLog.Info("Shutting down background block template generator.")
s.cancel()
}
// Stop the CPU miner if needed.
if cfg.Generate && s.cpuMiner != nil {
s.cpuMiner.Stop()
@ -2471,6 +2487,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
}
}
ctx, cancel := context.WithCancel(context.Background())
s := server{
chainParams: chainParams,
addrManager: amgr,
@ -2488,6 +2505,8 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
timeSource: blockchain.NewMedianTime(),
services: services,
sigCache: txscript.NewSigCache(cfg.SigCacheMaxSize),
context: ctx,
cancel: cancel,
}
// Create the transaction and address indexes if needed.
@ -2592,6 +2611,11 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
ExistsAddrIndex: s.existsAddrIndex,
AddTxToFeeEstimation: s.feeEstimator.AddMemPoolTransaction,
RemoveTxFromFeeEstimation: s.feeEstimator.RemoveMemPoolTransaction,
OnVoteReceived: func(voteTx *wire.MsgTx) {
if s.bg != nil {
s.bg.OnVoteReceived(voteTx)
}
},
}
s.txMemPool = mempool.New(&txC)
@ -2606,12 +2630,19 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
BlockPrioritySize: cfg.BlockPrioritySize,
TxMinFreeFee: cfg.minRelayTxFee,
}
blockTemplateGenerator := newBlkTmplGenerator(&policy, s.txMemPool,
s.timeSource, s.sigCache, s.chainParams, bm.chain, bm)
tg := newBlkTmplGenerator(&policy, s.txMemPool, s.timeSource, s.sigCache,
s.chainParams, bm.chain, bm)
// Create the background block template generator if the config has a
// mining address.
if len(cfg.miningAddrs) > 0 {
s.bg = newBgBlkTmplGenerator(tg, cfg.miningAddrs, cfg.SimNet)
}
s.cpuMiner = newCPUMiner(&cpuminerConfig{
ChainParams: s.chainParams,
PermitConnectionlessMining: cfg.SimNet,
BlockTemplateGenerator: blockTemplateGenerator,
BlockTemplateGenerator: tg,
MiningAddrs: cfg.miningAddrs,
ProcessBlock: bm.ProcessBlock,
ConnectedCount: s.ConnectedCount,
@ -2701,8 +2732,7 @@ func newServer(listenAddrs []string, db database.DB, chainParams *chaincfg.Param
}
if !cfg.DisableRPC {
s.rpcServer, err = newRPCServer(cfg.RPCListeners,
blockTemplateGenerator, &s)
s.rpcServer, err = newRPCServer(cfg.RPCListeners, tg, &s)
if err != nil {
return nil, err
}