Prevent high memory usage when turning txindex on first time (#412)

If the transaction index was run and never synced before, the first
time run will cause an OOM failure on machines with low RAM. This was
due to the GC never collecting blocks and freeing them after they were
used, because there were references to the block pointers or internal
pointers to the transaction hashes in the database updates. The
pointers to hashes have all been replaced with values to save memory
and allow the GC to free the blocks/transactions as the indexer runs.
This commit is contained in:
C Jepson 2016-10-10 13:28:52 -04:00 committed by Alex Yocom-Piatt
parent 26eb0c1591
commit cd57e44f20
4 changed files with 35 additions and 34 deletions

View File

@ -143,7 +143,7 @@ var (
// fetchBlockHashFunc defines a callback function to use in order to convert a
// serialized block ID to an associated block hash.
type fetchBlockHashFunc func(serializedID []byte) (*chainhash.Hash, error)
type fetchBlockHashFunc func(serializedID []byte) (chainhash.Hash, error)
// serializeAddrIndexEntry serializes the provided block id and transaction
// location according to the format described in detail above.
@ -170,7 +170,7 @@ func deserializeAddrIndexEntry(serialized []byte, region *database.BlockRegion,
if err != nil {
return err
}
region.Hash = hash
region.Hash = &hash
region.Offset = byteOrder.Uint32(serialized[4:8])
region.Len = byteOrder.Uint32(serialized[8:12])
return nil
@ -816,7 +816,8 @@ func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block, parent *dcrutil.Bloc
return err
}
parentBlockID, err = dbFetchBlockIDByHash(dbTx, parent.Sha())
parentSha := parent.Sha()
parentBlockID, err = dbFetchBlockIDByHash(dbTx, *parentSha)
if err != nil {
return err
}
@ -835,7 +836,8 @@ func (idx *AddrIndex) ConnectBlock(dbTx database.Tx, block, parent *dcrutil.Bloc
}
// Get the internal block ID associated with the block.
blockID, err := dbFetchBlockIDByHash(dbTx, block.Sha())
blockSha := block.Sha()
blockID, err := dbFetchBlockIDByHash(dbTx, *blockSha)
if err != nil {
return err
}
@ -913,7 +915,7 @@ func (idx *AddrIndex) TxRegionsForAddress(dbTx database.Tx, addr dcrutil.Address
err = idx.db.View(func(dbTx database.Tx) error {
// Create closure to lookup the block hash given the ID using
// the database transaction.
fetchBlockHash := func(id []byte) (*chainhash.Hash, error) {
fetchBlockHash := func(id []byte) (chainhash.Hash, error) {
// Deserialize and populate the result.
return dbFetchBlockHashBySerializedID(dbTx, id)
}

View File

@ -476,7 +476,7 @@ func indexNeedsInputs(index Indexer) bool {
// dbFetchTx looks up the passed transaction hash in the transaction index and
// loads it from the database.
func dbFetchTx(dbTx database.Tx, hash *chainhash.Hash) (*wire.MsgTx, error) {
func dbFetchTx(dbTx database.Tx, hash chainhash.Hash) (*wire.MsgTx, error) {
// Look up the location of the transaction.
blockRegion, err := dbFetchTxIndexEntry(dbTx, hash)
if err != nil {
@ -530,7 +530,7 @@ func makeUtxoView(dbTx database.Tx, block, parent *dcrutil.Block) (*blockchain.U
continue
}
originTx, err := dbFetchTx(dbTx, &originOut.Hash)
originTx, err := dbFetchTx(dbTx, originOut.Hash)
if err != nil {
return nil, err
}
@ -557,7 +557,7 @@ func makeUtxoView(dbTx database.Tx, block, parent *dcrutil.Block) (*blockchain.U
continue
}
originTx, err := dbFetchTx(dbTx, &originOut.Hash)
originTx, err := dbFetchTx(dbTx, originOut.Hash)
if err != nil {
return nil, err
}

View File

@ -37,9 +37,6 @@ var (
// errNoBlockIDEntry is an error that indicates a requested entry does
// not exist in the block ID index.
errNoBlockIDEntry = errors.New("no entry in the block ID index")
// errNilHashPointer indicating that the passed hash pointer is nil.
errNilHashPointer = errors.New("nil hash pointer argument passed")
)
// -----------------------------------------------------------------------------
@ -98,7 +95,7 @@ var (
// dbPutBlockIDIndexEntry uses an existing database transaction to update or add
// the index entries for the hash to id and id to hash mappings for the provided
// values.
func dbPutBlockIDIndexEntry(dbTx database.Tx, hash *chainhash.Hash, id uint32) error {
func dbPutBlockIDIndexEntry(dbTx database.Tx, hash chainhash.Hash, id uint32) error {
// Serialize the height for use in the index entries.
var serializedID [4]byte
byteOrder.PutUint32(serializedID[:], id)
@ -117,7 +114,7 @@ func dbPutBlockIDIndexEntry(dbTx database.Tx, hash *chainhash.Hash, id uint32) e
// dbRemoveBlockIDIndexEntry uses an existing database transaction remove index
// entries from the hash to id and id to hash mappings for the provided hash.
func dbRemoveBlockIDIndexEntry(dbTx database.Tx, hash *chainhash.Hash) error {
func dbRemoveBlockIDIndexEntry(dbTx database.Tx, hash chainhash.Hash) error {
// Remove the block hash to ID mapping.
meta := dbTx.Metadata()
hashIndex := meta.Bucket(idByHashIndexBucketName)
@ -136,11 +133,7 @@ func dbRemoveBlockIDIndexEntry(dbTx database.Tx, hash *chainhash.Hash) error {
// dbFetchBlockIDByHash uses an existing database transaction to retrieve the
// block id for the provided hash from the index.
func dbFetchBlockIDByHash(dbTx database.Tx, hash *chainhash.Hash) (uint32, error) {
if hash == nil {
return 0, errNilHashPointer
}
func dbFetchBlockIDByHash(dbTx database.Tx, hash chainhash.Hash) (uint32, error) {
hashIndex := dbTx.Metadata().Bucket(idByHashIndexBucketName)
serializedID := hashIndex.Get(hash[:])
if serializedID == nil {
@ -152,21 +145,21 @@ func dbFetchBlockIDByHash(dbTx database.Tx, hash *chainhash.Hash) (uint32, error
// dbFetchBlockHashBySerializedID uses an existing database transaction to
// retrieve the hash for the provided serialized block id from the index.
func dbFetchBlockHashBySerializedID(dbTx database.Tx, serializedID []byte) (*chainhash.Hash, error) {
func dbFetchBlockHashBySerializedID(dbTx database.Tx, serializedID []byte) (chainhash.Hash, error) {
idIndex := dbTx.Metadata().Bucket(hashByIDIndexBucketName)
hashBytes := idIndex.Get(serializedID)
if hashBytes == nil {
return nil, errNoBlockIDEntry
return chainhash.Hash{}, errNoBlockIDEntry
}
var hash chainhash.Hash
copy(hash[:], hashBytes)
return &hash, nil
return hash, nil
}
// dbFetchBlockHashByID uses an existing database transaction to retrieve the
// hash for the provided block id from the index.
func dbFetchBlockHashByID(dbTx database.Tx, id uint32) (*chainhash.Hash, error) {
func dbFetchBlockHashByID(dbTx database.Tx, id uint32) (chainhash.Hash, error) {
var serializedID [4]byte
byteOrder.PutUint32(serializedID[:], id)
return dbFetchBlockHashBySerializedID(dbTx, serializedID[:])
@ -185,7 +178,7 @@ func putTxIndexEntry(target []byte, blockID uint32, txLoc wire.TxLoc) {
// dbPutTxIndexEntry uses an existing database transaction to update the
// transaction index given the provided serialized data that is expected to have
// been serialized putTxIndexEntry.
func dbPutTxIndexEntry(dbTx database.Tx, txHash *chainhash.Hash, serializedData []byte) error {
func dbPutTxIndexEntry(dbTx database.Tx, txHash chainhash.Hash, serializedData []byte) error {
txIndex := dbTx.Metadata().Bucket(txIndexKey)
return txIndex.Put(txHash[:], serializedData)
}
@ -194,7 +187,7 @@ func dbPutTxIndexEntry(dbTx database.Tx, txHash *chainhash.Hash, serializedData
// region for the provided transaction hash from the transaction index. When
// there is no entry for the provided hash, nil will be returned for the both
// the region and the error.
func dbFetchTxIndexEntry(dbTx database.Tx, txHash *chainhash.Hash) (*database.BlockRegion, error) {
func dbFetchTxIndexEntry(dbTx database.Tx, txHash chainhash.Hash) (*database.BlockRegion, error) {
// Load the record from the database and return now if it doesn't exist.
txIndex := dbTx.Metadata().Bucket(txIndexKey)
serializedData := txIndex.Get(txHash[:])
@ -250,7 +243,8 @@ func dbAddTxIndexEntries(dbTx database.Tx, block, parent *dcrutil.Block, blockID
return err
}
parentBlockID, err = dbFetchBlockIDByHash(dbTx, parent.Sha())
parentSha := parent.Sha()
parentBlockID, err = dbFetchBlockIDByHash(dbTx, *parentSha)
if err != nil {
return err
}
@ -281,7 +275,8 @@ func dbAddTxIndexEntries(dbTx database.Tx, block, parent *dcrutil.Block, blockID
putTxIndexEntry(serializedValues[offset:], blockIDToUse, allTxsLocs[i])
endOffset := offset + txEntrySize
err := dbPutTxIndexEntry(dbTx, tx.Sha(),
txSha := tx.Sha()
err := dbPutTxIndexEntry(dbTx, *txSha,
serializedValues[offset:endOffset:endOffset])
if err != nil {
return err
@ -294,7 +289,7 @@ func dbAddTxIndexEntries(dbTx database.Tx, block, parent *dcrutil.Block, blockID
// dbRemoveTxIndexEntry uses an existing database transaction to remove the most
// recent transaction index entry for the given hash.
func dbRemoveTxIndexEntry(dbTx database.Tx, txHash *chainhash.Hash) error {
func dbRemoveTxIndexEntry(dbTx database.Tx, txHash chainhash.Hash) error {
txIndex := dbTx.Metadata().Bucket(txIndexKey)
serializedData := txIndex.Get(txHash[:])
if len(serializedData) == 0 {
@ -312,14 +307,16 @@ func dbRemoveTxIndexEntries(dbTx database.Tx, block, parent *dcrutil.Block) erro
dcrutil.BlockValid)
if regularTxTreeValid {
for _, tx := range parent.Transactions() {
err := dbRemoveTxIndexEntry(dbTx, tx.Sha())
txSha := tx.Sha()
err := dbRemoveTxIndexEntry(dbTx, *txSha)
if err != nil {
return err
}
}
}
for _, tx := range block.STransactions() {
err := dbRemoveTxIndexEntry(dbTx, tx.Sha())
txSha := tx.Sha()
err := dbRemoveTxIndexEntry(dbTx, *txSha)
if err != nil {
return err
}
@ -447,7 +444,8 @@ func (idx *TxIndex) ConnectBlock(dbTx database.Tx, block, parent *dcrutil.Block,
// Add the new block ID index entry for the block being connected and
// update the current internal block ID accordingly.
err := dbPutBlockIDIndexEntry(dbTx, block.Sha(), newBlockID)
blockSha := block.Sha()
err := dbPutBlockIDIndexEntry(dbTx, *blockSha, newBlockID)
if err != nil {
return err
}
@ -468,7 +466,8 @@ func (idx *TxIndex) DisconnectBlock(dbTx database.Tx, block, parent *dcrutil.Blo
// Remove the block ID index entry for the block being disconnected and
// decrement the current internal block ID to account for it.
if err := dbRemoveBlockIDIndexEntry(dbTx, block.Sha()); err != nil {
blockSha := block.Sha()
if err := dbRemoveBlockIDIndexEntry(dbTx, *blockSha); err != nil {
return err
}
idx.curBlockID--
@ -481,7 +480,7 @@ func (idx *TxIndex) DisconnectBlock(dbTx database.Tx, block, parent *dcrutil.Blo
// will be returned for the both the entry and the error.
//
// This function is safe for concurrent access.
func (idx *TxIndex) TxBlockRegion(hash *chainhash.Hash) (*database.BlockRegion, error) {
func (idx *TxIndex) TxBlockRegion(hash chainhash.Hash) (*database.BlockRegion, error) {
var region *database.BlockRegion
err := idx.db.View(func(dbTx database.Tx) error {
var err error

View File

@ -3691,7 +3691,7 @@ func handleGetRawTransaction(s *rpcServer, cmd interface{}, closeChan <-chan str
}
// Look up the location of the transaction.
blockRegion, err := txIndex.TxBlockRegion(txHash)
blockRegion, err := txIndex.TxBlockRegion(*txHash)
if err != nil {
context := "Failed to retrieve transaction location"
return nil, internalRPCError(err.Error(), context)
@ -4504,7 +4504,7 @@ func fetchInputTxos(s *rpcServer, tx *wire.MsgTx) (map[wire.OutPoint]wire.TxOut,
}
// Look up the location of the transaction.
blockRegion, err := s.server.txIndex.TxBlockRegion(&origin.Hash)
blockRegion, err := s.server.txIndex.TxBlockRegion(origin.Hash)
if err != nil {
context := "Failed to retrieve transaction location"
return nil, internalRPCError(err.Error(), context)