mirror of
https://github.com/FlipsideCrypto/dcrd.git
synced 2026-02-06 10:56:47 +00:00
indexers: Provide interface for index removal.
Indexes may now optionally provide their own implementation for dropping the index, with a fallback to simply removing the index bucket and metadata if not implemented. Using an interface and dynamically dispatching to the correct drop implementation also allowed removing a special case for deletion of the transaction index from the common drop code.
This commit is contained in:
parent
0fc55252f9
commit
b092705295
@ -1091,5 +1091,10 @@ func NewAddrIndex(db database.DB, chainParams *chaincfg.Params) *AddrIndex {
|
||||
// DropAddrIndex drops the address index from the provided database if it
|
||||
// exists.
|
||||
func DropAddrIndex(db database.DB, interrupt <-chan struct{}) error {
|
||||
return dropIndex(db, addrIndexKey, addrIndexName, interrupt)
|
||||
return dropFlatIndex(db, addrIndexKey, addrIndexName, interrupt)
|
||||
}
|
||||
|
||||
// DropIndex drops the address index from the provided database if it exists.
|
||||
func (*AddrIndex) DropIndex(db database.DB, interrupt <-chan struct{}) error {
|
||||
return DropAddrIndex(db, interrupt)
|
||||
}
|
||||
|
||||
@ -60,6 +60,13 @@ type Indexer interface {
|
||||
DisconnectBlock(dbTx database.Tx, block, parent *dcrutil.Block, view *blockchain.UtxoViewpoint) error
|
||||
}
|
||||
|
||||
// IndexDropper provides a method to remove an index from the database. Indexers
|
||||
// may implement this for a more efficient way of deleting themselves from the
|
||||
// database rather than simply dropping a bucket.
|
||||
type IndexDropper interface {
|
||||
DropIndex(db database.DB, interrupt <-chan struct{}) error
|
||||
}
|
||||
|
||||
// AssertError identifies an error that indicates an internal code consistency
|
||||
// issue and should be treated as a critical and unrecoverable error.
|
||||
type AssertError string
|
||||
|
||||
@ -411,6 +411,12 @@ func (idx *ExistsAddrIndex) AddUnconfirmedTx(tx *wire.MsgTx) {
|
||||
// DropExistsAddrIndex drops the exists address index from the provided
|
||||
// database if it exists.
|
||||
func DropExistsAddrIndex(db database.DB, interrupt <-chan struct{}) error {
|
||||
return dropIndex(db, existsAddrIndexKey, existsAddressIndexName,
|
||||
return dropFlatIndex(db, existsAddrIndexKey, existsAddressIndexName,
|
||||
interrupt)
|
||||
}
|
||||
|
||||
// DropIndex drops the exists address index from the provided database if it
|
||||
// exists.
|
||||
func (*ExistsAddrIndex) DropIndex(db database.DB, interrupt <-chan struct{}) error {
|
||||
return DropExistsAddrIndex(db, interrupt)
|
||||
}
|
||||
|
||||
@ -188,9 +188,18 @@ func (m *Manager) maybeFinishDrops(interrupt <-chan struct{}) error {
|
||||
}
|
||||
|
||||
log.Infof("Resuming %s drop", indexer.Name())
|
||||
err := dropIndex(m.db, indexer.Key(), indexer.Name(), interrupt)
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
switch d := indexer.(type) {
|
||||
case IndexDropper:
|
||||
err := d.DropIndex(m.db, interrupt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
err := dropIndex(m.db, indexer.Key(), indexer.Name())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -653,47 +662,32 @@ func NewManager(db database.DB, enabledIndexes []Indexer, params *chaincfg.Param
|
||||
}
|
||||
}
|
||||
|
||||
// dropIndex drops the passed index from the database. Since indexes can be
|
||||
// massive, it deletes the index in multiple database transactions in order to
|
||||
// keep memory usage to reasonable levels. It also marks the drop in progress
|
||||
// so the drop can be resumed if it is stopped before it is done before the
|
||||
// index can be used again.
|
||||
func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan struct{}) error {
|
||||
// Nothing to do if the index doesn't already exist.
|
||||
var needsDelete bool
|
||||
// existsIndex returns whether the index keyed by idxKey exists in the database.
|
||||
func existsIndex(db database.DB, idxKey []byte, idxName string) (bool, error) {
|
||||
var exists bool
|
||||
err := db.View(func(dbTx database.Tx) error {
|
||||
indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
|
||||
if indexesBucket != nil && indexesBucket.Get(idxKey) != nil {
|
||||
needsDelete = true
|
||||
exists = true
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !needsDelete {
|
||||
log.Infof("Not dropping %s because it does not exist", idxName)
|
||||
return nil
|
||||
}
|
||||
return exists, err
|
||||
}
|
||||
|
||||
// Mark that the index is in the process of being dropped so that it
|
||||
// can be resumed on the next start if interrupted before the process is
|
||||
// complete.
|
||||
log.Infof("Dropping all %s entries. This might take a while...",
|
||||
idxName)
|
||||
err = db.Update(func(dbTx database.Tx) error {
|
||||
// markIndexDeletion marks the index identified by idxKey for deletion. Marking
|
||||
// an index for deletion allows deletion to resume next startup if an
|
||||
// incremental deletion was interrupted.
|
||||
func markIndexDeletion(db database.DB, idxKey []byte) error {
|
||||
return db.Update(func(dbTx database.Tx) error {
|
||||
indexesBucket := dbTx.Metadata().Bucket(indexTipsBucketName)
|
||||
return indexesBucket.Put(indexDropKey(idxKey), idxKey)
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Since the indexes can be so large, attempting to simply delete
|
||||
// the bucket in a single database transaction would result in massive
|
||||
// memory usage and likely crash many systems due to ulimits. In order
|
||||
// to avoid this, use a cursor to delete a maximum number of entries out
|
||||
// of the bucket at a time.
|
||||
// incrementalFlatDrop uses multiple database updates to remove key/value pairs
|
||||
// saved to a flat index.
|
||||
func incrementalFlatDrop(db database.DB, idxKey []byte, idxName string, interrupt <-chan struct{}) error {
|
||||
const maxDeletions = 2000000
|
||||
var totalDeleted uint64
|
||||
for numDeleted := maxDeletions; numDeleted == maxDeletions; {
|
||||
@ -725,29 +719,106 @@ func dropIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan s
|
||||
return errInterruptRequested
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Call extra index specific deinitialization for the transaction index.
|
||||
if idxName == txIndexName {
|
||||
if err := dropBlockIDIndex(db); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Remove the index tip, index bucket, and in-progress drop flag now
|
||||
// that all index entries have been removed.
|
||||
err = db.Update(func(dbTx database.Tx) error {
|
||||
// dropIndexMetadata drops the passed index from the database by removing the
|
||||
// top level bucket for the index, the index tip, and any in-progress drop flag.
|
||||
func dropIndexMetadata(db database.DB, idxKey []byte, idxName string) error {
|
||||
return db.Update(func(dbTx database.Tx) error {
|
||||
meta := dbTx.Metadata()
|
||||
indexesBucket := meta.Bucket(indexTipsBucketName)
|
||||
if err := indexesBucket.Delete(idxKey); err != nil {
|
||||
err := indexesBucket.Delete(idxKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := meta.DeleteBucket(idxKey); err != nil {
|
||||
err = meta.DeleteBucket(idxKey)
|
||||
if err != nil && !database.IsError(err, database.ErrBucketNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
return indexesBucket.Delete(indexDropKey(idxKey))
|
||||
})
|
||||
}
|
||||
|
||||
// dropFlatIndex incrementally drops the passed index from the database. Since
|
||||
// indexes can be massive, it deletes the index in multiple database
|
||||
// transactions in order to keep memory usage to reasonable levels. For this
|
||||
// algorithm to work, the index must be "flat" (have no nested buckets). It
|
||||
// also marks the drop in progress so the drop can be resumed if it is stopped
|
||||
// before it is done before the index can be used again.
|
||||
func dropFlatIndex(db database.DB, idxKey []byte, idxName string, interrupt <-chan struct{}) error {
|
||||
// Nothing to do if the index doesn't already exist.
|
||||
exists, err := existsIndex(db, idxKey, idxName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
log.Infof("Not dropping %s because it does not exist", idxName)
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Infof("Dropping all %s entries. This might take a while...",
|
||||
idxName)
|
||||
|
||||
// Mark that the index is in the process of being dropped so that it
|
||||
// can be resumed on the next start if interrupted before the process is
|
||||
// complete.
|
||||
err = markIndexDeletion(db, idxKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Since the indexes can be so large, attempting to simply delete
|
||||
// the bucket in a single database transaction would result in massive
|
||||
// memory usage and likely crash many systems due to ulimits. In order
|
||||
// to avoid this, use a cursor to delete a maximum number of entries out
|
||||
// of the bucket at a time.
|
||||
err = incrementalFlatDrop(db, idxKey, idxName, interrupt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove the index tip, index bucket, and in-progress drop flag now
|
||||
// that all index entries have been removed.
|
||||
err = dropIndexMetadata(db, idxKey, idxName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Dropped %s", idxName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// dropIndex drops the passed index from the database without using incremental
|
||||
// deletion. This should be used to drop indexes containing nested buckets,
|
||||
// which can not be deleted with dropFlatIndex.
|
||||
func dropIndex(db database.DB, idxKey []byte, idxName string) error {
|
||||
// Nothing to do if the index doesn't already exist.
|
||||
exists, err := existsIndex(db, idxKey, idxName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
log.Infof("Not dropping %s because it does not exist", idxName)
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Infof("Dropping all %s entries. This might take a while...",
|
||||
idxName)
|
||||
|
||||
// Mark that the index is in the process of being dropped so that it
|
||||
// can be resumed on the next start if interrupted before the process is
|
||||
// complete.
|
||||
err = markIndexDeletion(db, idxKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove the index tip, index bucket, and in-progress drop flag. Removing
|
||||
// the index bucket also recursively removes all values saved to the index.
|
||||
err = dropIndexMetadata(db, idxKey, idxName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -509,10 +509,64 @@ func dropBlockIDIndex(db database.DB) error {
|
||||
// exists. Since the address index relies on it, the address index will also be
|
||||
// dropped when it exists.
|
||||
func DropTxIndex(db database.DB, interrupt <-chan struct{}) error {
|
||||
err := dropIndex(db, addrIndexKey, addrIndexName, interrupt)
|
||||
// Nothing to do if the index doesn't already exist.
|
||||
exists, err := existsIndex(db, txIndexKey, txIndexName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
log.Infof("Not dropping %s because it does not exist", txIndexName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Mark that the index is in the process of being dropped so that it
|
||||
// can be resumed on the next start if interrupted before the process is
|
||||
// complete.
|
||||
err = markIndexDeletion(db, txIndexKey)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return dropIndex(db, txIndexKey, txIndexName, interrupt)
|
||||
// Drop the address index if it exists, as it depends on the transaction
|
||||
// index.
|
||||
err = DropAddrIndex(db, interrupt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Dropping all %s entries. This might take a while...",
|
||||
txIndexName)
|
||||
|
||||
// Since the indexes can be so large, attempting to simply delete
|
||||
// the bucket in a single database transaction would result in massive
|
||||
// memory usage and likely crash many systems due to ulimits. In order
|
||||
// to avoid this, use a cursor to delete a maximum number of entries out
|
||||
// of the bucket at a time.
|
||||
err = incrementalFlatDrop(db, txIndexKey, txIndexName, interrupt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Call extra index specific deinitialization for the transaction index.
|
||||
err = dropBlockIDIndex(db)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove the index tip, index bucket, and in-progress drop flag now
|
||||
// that all index entries have been removed.
|
||||
err = dropIndexMetadata(db, txIndexKey, txIndexName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("Dropped %s", txIndexName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// DropIndex drops the transaction index from the provided database if it
|
||||
// exists. Since the address index relies on it, the address index will also be
|
||||
// dropped when it exists.
|
||||
func (*TxIndex) DropIndex(db database.DB, interrupt <-chan struct{}) error {
|
||||
return DropTxIndex(db, interrupt)
|
||||
}
|
||||
|
||||
@ -196,3 +196,9 @@ func (e Error) Error() string {
|
||||
func makeError(c ErrorCode, desc string, err error) Error {
|
||||
return Error{ErrorCode: c, Description: desc, Err: err}
|
||||
}
|
||||
|
||||
// IsError returns whether err is an Error with a matching error code.
|
||||
func IsError(err error, code ErrorCode) bool {
|
||||
e, ok := err.(Error)
|
||||
return ok && e.ErrorCode == code
|
||||
}
|
||||
|
||||
Loading…
Reference in New Issue
Block a user