diff --git a/claimtrie/node/manager.go b/claimtrie/node/manager.go index 48988c22..d92025cd 100644 --- a/claimtrie/node/manager.go +++ b/claimtrie/node/manager.go @@ -77,6 +77,7 @@ func (nc *nodeCache) Delete(key string) { existing := nc.elements[key] if existing != nil { delete(nc.elements, key) + existing.Value.(nodeCacheLeaf).node.Close() nc.data.Remove(existing) } } diff --git a/claimtrie/node/noderepo/pebble.go b/claimtrie/node/noderepo/pebble.go index 56cf0682..e444ae45 100644 --- a/claimtrie/node/noderepo/pebble.go +++ b/claimtrie/node/noderepo/pebble.go @@ -2,7 +2,9 @@ package noderepo import ( "bytes" + "io" "sort" + "sync" "github.com/btcsuite/btcd/claimtrie/change" "github.com/cockroachdb/pebble" @@ -13,9 +15,67 @@ type Pebble struct { db *pebble.DB } +type pooledMerger struct { + pool *sync.Pool + buffer []byte +} + +func (a *pooledMerger) MergeNewer(value []byte) error { + if a.buffer == nil { + a.buffer = a.pool.Get().([]byte) + } + a.buffer = append(a.buffer, value...) + return nil +} +func (a *pooledMerger) MergeOlder(value []byte) error { + if a.buffer == nil { + a.buffer = a.pool.Get().([]byte) + } + n := len(a.buffer) + if cap(a.buffer) >= len(value)+n { + a.buffer = a.buffer[0 : len(value)+n] // expand it + copy(a.buffer[len(value):], a.buffer[0:n]) // could overlap + copy(a.buffer, value) + } else { + existing := a.buffer + a.buffer = make([]byte, 0, len(value)+len(existing)) + a.buffer = append(a.buffer, value...) + a.buffer = append(a.buffer, existing...) + } + return nil +} + +func (a *pooledMerger) Finish(_ bool) ([]byte, io.Closer, error) { + return a.buffer, a, nil +} + +func (a *pooledMerger) Close() error { + a.pool.Put(a.buffer[:0]) + a.buffer = nil + a.pool = nil + return nil +} + func NewPebble(path string) (*Pebble, error) { - db, err := pebble.Open(path, &pebble.Options{Cache: pebble.NewCache(32 << 20), BytesPerSync: 4 << 20}) + mp := &sync.Pool{ + New: func() interface{} { + return make([]byte, 0, 256) + }, + } + + db, err := pebble.Open(path, &pebble.Options{ + Merger: &pebble.Merger{ + Merge: func(key, value []byte) (pebble.ValueMerger, error) { + p := &pooledMerger{pool: mp} + return p, p.MergeNewer(value) + }, + Name: pebble.DefaultMerger.Name, // yes, it's a lie + }, + Cache: pebble.NewCache(32 << 20), + BytesPerSync: 4 << 20, + }) + repo := &Pebble{db: db} return repo, errors.Wrapf(err, "unable to open %s", path) @@ -36,6 +96,7 @@ func (repo *Pebble) AppendChanges(changes []change.Change) error { return errors.Wrap(err, "in marshaller") } + // expecting this next line to make a copy of the buffer, not hold it err = batch.Merge(chg.Name, buffer.Bytes(), pebble.NoSync) if err != nil { return errors.Wrap(err, "in merge")