dcrd/chainindexer.go
Dave Collins b6d426241d blockchain: Rework to use new db interface.
This commit is the first stage of several that are planned to convert
the blockchain package into a concurrent safe package that will
ultimately allow support for multi-peer download and concurrent chain
processing.  The goal is to update btcd proper after each step so it can
take advantage of the enhancements as they are developed.

In addition to the aforementioned benefit, this staged approach has been
chosen since it is absolutely critical to maintain consensus.
Separating the changes into several stages makes it easier for reviewers
to logically follow what is happening and therefore helps prevent
consensus bugs.  Naturally there are significant automated tests to help
prevent consensus issues as well.

The main focus of this stage is to convert the blockchain package to use
the new database interface and implement the chain-related functionality
which it no longer handles.  It also aims to improve efficiency in
various areas by making use of the new database and chain capabilities.

The following is an overview of the chain changes:

- Update to use the new database interface
- Add chain-related functionality that the old database used to handle
  - Main chain structure and state
  - Transaction spend tracking
- Implement a new pruned unspent transaction output (utxo) set
  - Provides efficient direct access to the unspent transaction outputs
  - Uses a domain specific compression algorithm that understands the
    standard transaction scripts in order to significantly compress them
  - Removes reliance on the transaction index and paves the way toward
    eventually enabling block pruning
- Modify the New function to accept a Config struct instead of
  inidividual parameters
- Replace the old TxStore type with a new UtxoViewpoint type that makes
  use of the new pruned utxo set
- Convert code to treat the new UtxoViewpoint as a rolling view that is
  used between connects and disconnects to improve efficiency
- Make best chain state always set when the chain instance is created
  - Remove now unnecessary logic for dealing with unset best state
- Make all exported functions concurrent safe
  - Currently using a single chain state lock as it provides a straight
    forward and easy to review path forward however this can be improved
    with more fine grained locking
- Optimize various cases where full blocks were being loaded when only
  the header is needed to help reduce the I/O load
- Add the ability for callers to get a snapshot of the current best
  chain stats in a concurrent safe fashion
  - Does not block callers while new blocks are being processed
- Make error messages that reference transaction outputs consistently
  use <transaction hash>:<output index>
- Introduce a new AssertError type an convert internal consistency
  checks to use it
- Update tests and examples to reflect the changes
- Add a full suite of tests to ensure correct functionality of the new
  code

The following is an overview of the btcd changes:

- Update to use the new database and chain interfaces
- Temporarily remove all code related to the transaction index
- Temporarily remove all code related to the address index
- Convert all code that uses transaction stores to use the new utxo
  view
- Rework several calls that required the block manager for safe
  concurrency to use the chain package directly now that it is
  concurrent safe
- Change all calls to obtain the best hash to use the new best state
  snapshot capability from the chain package
- Remove workaround for limits on fetching height ranges since the new
  database interface no longer imposes them
- Correct the gettxout RPC handler to return the best chain hash as
  opposed the hash the txout was found in
- Optimize various RPC handlers:
  - Change several of the RPC handlers to use the new chain snapshot
    capability to avoid needlessly loading data
  - Update several handlers to use new functionality to avoid accessing
    the block manager so they are able to return the data without
    blocking when the server is busy processing blocks
  - Update non-verbose getblock to avoid deserialization and
    serialization overhead
  - Update getblockheader to request the block height directly from
    chain and only load the header
  - Update getdifficulty to use the new cached data from chain
  - Update getmininginfo to use the new cached data from chain
  - Update non-verbose getrawtransaction to avoid deserialization and
    serialization overhead
  - Update gettxout to use the new utxo store versus loading
    full transactions using the transaction index

The following is an overview of the utility changes:
- Update addblock to use the new database and chain interfaces
- Update findcheckpoint to use the new database and chain interfaces
- Remove the dropafter utility which is no longer supported

NOTE: The transaction index and address index will be reimplemented in
another commit.
2016-08-18 15:42:18 -04:00

487 lines
14 KiB
Go

// Copyright (c) 2013-2014 The btcsuite developers
// Copyright (c) 2015-2016 The Decred developers
// Use of this source code is governed by an ISC
// license that can be found in the LICENSE file.
package main
/*
import (
"fmt"
"sync"
"sync/atomic"
"github.com/decred/dcrd/blockchain"
"github.com/decred/dcrd/blockchain/stake"
"github.com/decred/dcrd/chaincfg/chainhash"
"github.com/decred/dcrd/database"
"github.com/decred/dcrd/txscript"
"github.com/decred/dcrd/wire"
"github.com/decred/dcrutil"
"github.com/btcsuite/golangcrypto/ripemd160"
)
TODO Replace this with a new addrindexer
type indexState int
const (
// Our two operating modes.
// We go into "CatchUp" mode when, on boot, the current best
// chain height is greater than the last block we've indexed.
// "CatchUp" mode is characterized by several concurrent worker
// goroutines indexing blocks organized by a manager goroutine.
// When in "CatchUp" mode, incoming requests to index newly solved
// blocks are backed up for later processing. Once we've finished
// catching up, we process these queued jobs, and then enter into
// "maintenance" mode.
indexCatchUp indexState = iota
// When in "maintenance" mode, we have a single worker serially
// processing incoming jobs to index newly solved blocks.
indexMaintain
)
// addrIndexer provides a concurrent service for indexing the transactions of
// target blocks based on the addresses involved in the transaction.
type addrIndexer struct {
server *server
started int32
shutdown int32
state indexState
progressLogger *blockProgressLogger
currentIndexTip int64
chainTip int64
sync.Mutex
}
// newAddrIndexer creates a new block address indexer.
// Use Start to begin processing incoming index jobs.
func newAddrIndexer(s *server) (*addrIndexer, error) {
_, chainHeight, err := s.db.NewestSha()
if err != nil {
return nil, err
}
_, lastIndexedHeight, err := s.db.FetchAddrIndexTip()
if err != nil && err != database.ErrAddrIndexDoesNotExist {
return nil, err
}
var state indexState
if chainHeight == lastIndexedHeight {
state = indexMaintain
} else {
state = indexCatchUp
}
ai := &addrIndexer{
server: s,
state: state,
currentIndexTip: lastIndexedHeight,
chainTip: chainHeight,
progressLogger: newBlockProgressLogger("Indexed addresses of",
adxrLog),
}
return ai, nil
}
// Start begins processing of incoming indexing jobs.
func (a *addrIndexer) Start() {
// Already started?
if atomic.AddInt32(&a.started, 1) != 1 {
return
}
adxrLog.Trace("Starting address indexer")
err := a.initialize()
if err != nil {
adxrLog.Errorf("Couldn't start address indexer: %v", err.Error())
return
}
}
// Stop gracefully shuts down the address indexer by stopping all ongoing
// worker goroutines, waiting for them to finish their current task.
func (a *addrIndexer) Stop() error {
if atomic.AddInt32(&a.shutdown, 1) != 1 {
adxrLog.Warnf("Address indexer is already in the process of " +
"shutting down")
return nil
}
adxrLog.Infof("Address indexer shutting down")
return nil
}
// IsCaughtUp returns a bool representing if the address indexer has
// caught up with the best height on the main chain.
func (a *addrIndexer) IsCaughtUp() bool {
a.Lock()
defer a.Unlock()
return a.state == indexMaintain
}
// initialize starts the address indexer and fills the database up to the
// top height of the current database.
func (a *addrIndexer) initialize() error {
if a.state == indexCatchUp {
adxrLog.Infof("Building up address index from height %v to %v.",
a.currentIndexTip+1, a.chainTip)
// Starting from the next block after our current index tip,
// feed our workers each successive block to index until we've
// caught up to the current highest block height.
lastBlockIdxHeight := a.currentIndexTip + 1
for lastBlockIdxHeight <= a.chainTip {
// Skip the genesis block.
if !(lastBlockIdxHeight == 0) {
targetSha, err := a.server.db.FetchBlockShaByHeight(
lastBlockIdxHeight)
if err != nil {
return fmt.Errorf("Unable to look up the sha of the "+
"next target block (height %v): %v",
lastBlockIdxHeight, err)
}
targetBlock, err := a.server.db.FetchBlockBySha(targetSha)
if err != nil {
// Unable to locate a target block by sha, this
// is a critical error, we may have an
// inconsistency in the DB.
return fmt.Errorf("Unable to look up the next "+
"target block (sha %v): %v", targetSha, err)
}
targetParent, err := a.server.db.FetchBlockBySha(
&targetBlock.MsgBlock().Header.PrevBlock)
if err != nil {
// Unable to locate a target block by sha, this
// is a critical error, we may have an
// inconsistency in the DB.
return fmt.Errorf("Unable to look up the next "+
"target block parent (sha %v): %v",
targetBlock.MsgBlock().Header.PrevBlock, err)
}
addrIndex, err := a.indexBlockAddrs(targetBlock, targetParent)
if err != nil {
return fmt.Errorf("Unable to index transactions of"+
" block: %v", err)
}
err = a.server.db.UpdateAddrIndexForBlock(targetSha,
lastBlockIdxHeight,
addrIndex)
if err != nil {
return fmt.Errorf("Unable to insert block: %v", err.Error())
}
}
lastBlockIdxHeight++
}
a.Lock()
a.state = indexMaintain
a.Unlock()
}
adxrLog.Debugf("Address indexer has queued up to best height, safe " +
"to begin maintainence mode")
return nil
}
// convertToAddrIndex indexes all data pushes greater than 8 bytes within the
// passed SPK and returns a TxAddrIndex with the given data. Our "address"
// index is actually a hash160 index, where in the ideal case the data push
// is either the hash160 of a publicKey (P2PKH) or a Script (P2SH).
func convertToAddrIndex(scrVersion uint16, scr []byte, height int64,
locInBlock *wire.TxLoc, txType stake.TxType) ([]*database.TxAddrIndex, error) {
var tais []*database.TxAddrIndex
if scr == nil || locInBlock == nil {
return nil, fmt.Errorf("passed nil pointer")
}
var indexKey [ripemd160.Size]byte
// Get the script classes and extract the PKH if applicable.
// If it's multisig, unknown, etc, just hash the script itself.
class, addrs, _, err := txscript.ExtractPkScriptAddrs(scrVersion, scr,
activeNetParams.Params)
if err != nil {
return nil, fmt.Errorf("script conversion error: %v", err.Error())
}
knownType := false
for _, addr := range addrs {
switch {
case class == txscript.PubKeyTy:
copy(indexKey[:], addr.Hash160()[:])
case class == txscript.PubkeyAltTy:
copy(indexKey[:], addr.Hash160()[:])
case class == txscript.PubKeyHashTy:
copy(indexKey[:], addr.ScriptAddress()[:])
case class == txscript.PubkeyHashAltTy:
copy(indexKey[:], addr.ScriptAddress()[:])
case class == txscript.StakeSubmissionTy:
copy(indexKey[:], addr.ScriptAddress()[:])
case class == txscript.StakeGenTy:
copy(indexKey[:], addr.ScriptAddress()[:])
case class == txscript.StakeRevocationTy:
copy(indexKey[:], addr.ScriptAddress()[:])
case class == txscript.StakeSubChangeTy:
copy(indexKey[:], addr.ScriptAddress()[:])
case class == txscript.MultiSigTy:
copy(indexKey[:], addr.ScriptAddress()[:])
case class == txscript.ScriptHashTy:
copy(indexKey[:], addr.ScriptAddress()[:])
}
tai := &database.TxAddrIndex{
Hash160: indexKey,
Height: uint32(height),
TxOffset: uint32(locInBlock.TxStart),
TxLen: uint32(locInBlock.TxLen),
}
tais = append(tais, tai)
knownType = true
}
// This is a commitment for a future vote or
// revocation. Extract the address data from
// it and store it in the addrIndex.
if txType == stake.TxTypeSStx && class == txscript.NullDataTy {
addr, err := stake.AddrFromSStxPkScrCommitment(scr,
activeNetParams.Params)
if err != nil {
return nil, fmt.Errorf("ticket commit pkscr conversion error: %v",
err.Error())
}
copy(indexKey[:], addr.ScriptAddress()[:])
tai := &database.TxAddrIndex{
Hash160: indexKey,
Height: uint32(height),
TxOffset: uint32(locInBlock.TxStart),
TxLen: uint32(locInBlock.TxLen),
}
tais = append(tais, tai)
} else if !knownType {
copy(indexKey[:], dcrutil.Hash160(scr))
tai := &database.TxAddrIndex{
Hash160: indexKey,
Height: uint32(height),
TxOffset: uint32(locInBlock.TxStart),
TxLen: uint32(locInBlock.TxLen),
}
tais = append(tais, tai)
}
return tais, nil
}
// lookupTransaction is a special transaction lookup function that searches
// the database, the block, and its parent for a transaction. This is needed
// because indexBlockAddrs is called AFTER a block is added/removed in the
// blockchain in blockManager, necessitating that the blocks internally be
// searched for inputs for any given transaction too. Additionally, it's faster
// to get the tx from the blocks here since they're already
func (a *addrIndexer) lookupTransaction(txHash chainhash.Hash, blk *dcrutil.Block,
parent *dcrutil.Block) (*wire.MsgTx, error) {
// Search the previous block and parent first.
txTreeRegularValid := dcrutil.IsFlagSet16(blk.MsgBlock().Header.VoteBits,
dcrutil.BlockValid)
// Search the regular tx tree of this and the last block if the
// tx tree regular was validated.
if txTreeRegularValid {
for _, stx := range parent.STransactions() {
if stx.Sha().IsEqual(&txHash) {
return stx.MsgTx(), nil
}
}
for _, tx := range parent.Transactions() {
if tx.Sha().IsEqual(&txHash) {
return tx.MsgTx(), nil
}
}
for _, tx := range blk.Transactions() {
if tx.Sha().IsEqual(&txHash) {
return tx.MsgTx(), nil
}
}
} else {
// Just search this block's regular tx tree and the previous
// block's stake tx tree.
for _, stx := range parent.STransactions() {
if stx.Sha().IsEqual(&txHash) {
return stx.MsgTx(), nil
}
}
for _, tx := range blk.Transactions() {
if tx.Sha().IsEqual(&txHash) {
return tx.MsgTx(), nil
}
}
}
// Lookup and fetch the referenced output's tx in the database.
txList, err := a.server.db.FetchTxBySha(&txHash)
if err != nil {
adxrLog.Errorf("Error fetching tx %v: %v",
txHash, err)
return nil, err
}
if len(txList) == 0 {
return nil, fmt.Errorf("transaction %v not found",
txHash)
}
return txList[len(txList)-1].Tx, nil
}
// indexBlockAddrs returns a populated index of the all the transactions in the
// passed block based on the addresses involved in each transaction.
func (a *addrIndexer) indexBlockAddrs(blk *dcrutil.Block,
parent *dcrutil.Block) (database.BlockAddrIndex, error) {
var addrIndex database.BlockAddrIndex
_, stxLocs, err := blk.TxLoc()
if err != nil {
return nil, err
}
txTreeRegularValid := dcrutil.IsFlagSet16(blk.MsgBlock().Header.VoteBits,
dcrutil.BlockValid)
// Add regular transactions iff the block was validated.
if txTreeRegularValid {
txLocs, _, err := parent.TxLoc()
if err != nil {
return nil, err
}
for txIdx, tx := range parent.Transactions() {
// Tx's offset and length in the block.
locInBlock := &txLocs[txIdx]
// Coinbases don't have any inputs.
if !blockchain.IsCoinBase(tx) {
// Index the SPK's of each input's previous outpoint
// transaction.
for _, txIn := range tx.MsgTx().TxIn {
prevOutTx, err := a.lookupTransaction(
txIn.PreviousOutPoint.Hash,
blk,
parent)
inputOutPoint := prevOutTx.TxOut[txIn.PreviousOutPoint.Index]
toAppend, err := convertToAddrIndex(inputOutPoint.Version,
inputOutPoint.PkScript, parent.Height(), locInBlock,
stake.TxTypeRegular)
if err != nil {
adxrLog.Tracef("Error converting tx txin %v: %v",
txIn.PreviousOutPoint.Hash, err)
continue
}
addrIndex = append(addrIndex, toAppend...)
}
}
for _, txOut := range tx.MsgTx().TxOut {
toAppend, err := convertToAddrIndex(txOut.Version, txOut.PkScript,
parent.Height(), locInBlock, stake.TxTypeRegular)
if err != nil {
adxrLog.Tracef("Error converting tx txout %v: %v",
tx.MsgTx().TxSha(), err)
continue
}
addrIndex = append(addrIndex, toAppend...)
}
}
}
// Add stake transactions.
for stxIdx, stx := range blk.STransactions() {
// Tx's offset and length in the block.
locInBlock := &stxLocs[stxIdx]
txType := stake.DetermineTxType(stx)
// Index the SPK's of each input's previous outpoint
// transaction.
for i, txIn := range stx.MsgTx().TxIn {
// Stakebases don't have any inputs.
if txType == stake.TxTypeSSGen && i == 0 {
continue
}
// Lookup and fetch the referenced output's tx.
prevOutTx, err := a.lookupTransaction(
txIn.PreviousOutPoint.Hash,
blk,
parent)
inputOutPoint := prevOutTx.TxOut[txIn.PreviousOutPoint.Index]
toAppend, err := convertToAddrIndex(inputOutPoint.Version,
inputOutPoint.PkScript, blk.Height(), locInBlock,
txType)
if err != nil {
adxrLog.Tracef("Error converting stx txin %v: %v",
txIn.PreviousOutPoint.Hash, err)
continue
}
addrIndex = append(addrIndex, toAppend...)
}
for _, txOut := range stx.MsgTx().TxOut {
toAppend, err := convertToAddrIndex(txOut.Version, txOut.PkScript,
blk.Height(), locInBlock, txType)
if err != nil {
adxrLog.Tracef("Error converting stx txout %v: %v",
stx.MsgTx().TxSha(), err)
continue
}
addrIndex = append(addrIndex, toAppend...)
}
}
return addrIndex, nil
}
// InsertBlock synchronously queues a newly solved block to have its
// transactions indexed by address.
func (a *addrIndexer) InsertBlock(block *dcrutil.Block, parent *dcrutil.Block) error {
addrIndex, err := a.indexBlockAddrs(block, parent)
if err != nil {
return fmt.Errorf("Unable to index transactions of"+
" block: %v", err)
}
err = a.server.db.UpdateAddrIndexForBlock(block.Sha(),
block.Height(),
addrIndex)
if err != nil {
return fmt.Errorf("Unable to insert block: %v", err.Error())
}
return nil
}
// RemoveBlock removes all transactions from a block on the tip from the
// address index database.
func (a *addrIndexer) RemoveBlock(block *dcrutil.Block,
parent *dcrutil.Block) error {
addrIndex, err := a.indexBlockAddrs(block, parent)
if err != nil {
return fmt.Errorf("Unable to index transactions of"+
" block: %v", err)
}
err = a.server.db.DropAddrIndexForBlock(block.Sha(),
block.Height(),
addrIndex)
if err != nil {
return fmt.Errorf("Unable to remove block: %v", err.Error())
}
return nil
}
*/