From 73dbcf3943beb36ced77c6e5eb0a4e63742b8a06 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 9 Nov 2017 17:13:40 -0700 Subject: [PATCH] multi: add bitcoind back-end (only for API use for now) --- chain/bitcoind.go | 1149 +++++++++++++++++++++++++++++++++++++++ chain/interface.go | 11 + chain/neutrino.go | 5 + chain/rpc.go | 5 + glide.lock | 7 + glide.yaml | 1 + wallet/notifications.go | 3 +- wallet/wallet.go | 8 + 8 files changed, 1188 insertions(+), 1 deletion(-) create mode 100644 chain/bitcoind.go diff --git a/chain/bitcoind.go b/chain/bitcoind.go new file mode 100644 index 0000000..0fb42c7 --- /dev/null +++ b/chain/bitcoind.go @@ -0,0 +1,1149 @@ +package chain + +import ( + "bytes" + "container/list" + "encoding/hex" + "errors" + "sync" + "time" + + "github.com/pebbe/zmq4" + "github.com/roasbeef/btcd/btcjson" + "github.com/roasbeef/btcd/chaincfg" + "github.com/roasbeef/btcd/chaincfg/chainhash" + "github.com/roasbeef/btcd/rpcclient" + "github.com/roasbeef/btcd/txscript" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" + "github.com/roasbeef/btcwallet/waddrmgr" + "github.com/roasbeef/btcwallet/wtxmgr" +) + +// BitcoindClient represents a persistent client connection to a bitcoind server +// for information regarding the current best block chain. +type BitcoindClient struct { + client *rpcclient.Client + connConfig *rpcclient.ConnConfig // Work around unexported field + chainParams *chaincfg.Params + + zmqConnect string + zmqPollInterval time.Duration + + enqueueNotification chan interface{} + dequeueNotification chan interface{} + currentBlock chan *waddrmgr.BlockStamp + + clientMtx sync.RWMutex + rescanUpdate chan interface{} + startTime time.Time + watchOutPoints map[wire.OutPoint]struct{} + watchAddrs map[string]struct{} + watchTxIDs map[chainhash.Hash]struct{} + notifyBlocks bool + notifyRecvd bool + + quit chan struct{} + wg sync.WaitGroup + started bool + quitMtx sync.Mutex + + memPool map[chainhash.Hash]struct{} + memPoolExp map[int32]map[chainhash.Hash]struct{} +} + +// NewBitcoindClient creates a client connection to the server described by the +// connect string. If disableTLS is false, the remote RPC certificate must be +// provided in the certs slice. The connection is not established immediately, +// but must be done using the Start method. If the remote server does not +// operate on the same bitcoin network as described by the passed chain +// parameters, the connection will be disconnected. +func NewBitcoindClient(chainParams *chaincfg.Params, connect, user, pass, + zmqConnect string, zmqPollInterval time.Duration) (*BitcoindClient, + error) { + + client := &BitcoindClient{ + connConfig: &rpcclient.ConnConfig{ + Host: connect, + User: user, + Pass: pass, + DisableAutoReconnect: false, + DisableConnectOnNew: true, + DisableTLS: true, + HTTPPostMode: true, + }, + chainParams: chainParams, + zmqConnect: zmqConnect, + zmqPollInterval: zmqPollInterval, + enqueueNotification: make(chan interface{}), + dequeueNotification: make(chan interface{}), + currentBlock: make(chan *waddrmgr.BlockStamp), + rescanUpdate: make(chan interface{}), + watchOutPoints: make(map[wire.OutPoint]struct{}), + watchAddrs: make(map[string]struct{}), + watchTxIDs: make(map[chainhash.Hash]struct{}), + quit: make(chan struct{}), + memPool: make(map[chainhash.Hash]struct{}), + memPoolExp: make(map[int32]map[chainhash.Hash]struct{}), + } + rpcClient, err := rpcclient.New(client.connConfig, nil) + if err != nil { + return nil, err + } + client.client = rpcClient + return client, nil +} + +// BackEnd returns the name of the driver. +func (c *BitcoindClient) BackEnd() string { + return "bitcoind" +} + +// GetCurrentNet returns the network on which the bitcoind instance is running. +func (c *BitcoindClient) GetCurrentNet() (wire.BitcoinNet, error) { + hash, err := c.client.GetBlockHash(0) + if err != nil { + return 0, err + } + + switch *hash { + case *chaincfg.TestNet3Params.GenesisHash: + return chaincfg.TestNet3Params.Net, nil + case *chaincfg.RegressionNetParams.GenesisHash: + return chaincfg.RegressionNetParams.Net, nil + case *chaincfg.MainNetParams.GenesisHash: + return chaincfg.MainNetParams.Net, nil + default: + return 0, errors.New("unknown network") + } +} + +// GetBestBlock returns the highest block known to bitcoind. +func (c *BitcoindClient) GetBestBlock() (*chainhash.Hash, int32, error) { + bcinfo, err := c.client.GetBlockChainInfo() + if err != nil { + return nil, 0, err + } + + hash, err := chainhash.NewHashFromStr(bcinfo.BestBlockHash) + if err != nil { + return nil, 0, err + } + + return hash, bcinfo.Blocks, nil +} + +// GetBlockHeight returns the height for the hash, if known, or returns an +// error. +func (c *BitcoindClient) GetBlockHeight(hash *chainhash.Hash) (int32, error) { + header, err := c.GetBlockHeaderVerbose(hash) + if err != nil { + return 0, err + } + + return header.Height, nil +} + +// GetBlock returns a block from the hash. +func (c *BitcoindClient) GetBlock(hash *chainhash.Hash) (*wire.MsgBlock, + error) { + return c.client.GetBlock(hash) +} + +// GetBlockVerbose returns a verbose block from the hash. +func (c *BitcoindClient) GetBlockVerbose(hash *chainhash.Hash) ( + *btcjson.GetBlockVerboseResult, error) { + return c.client.GetBlockVerbose(hash) +} + +// GetBlockHash returns a block hash from the height. +func (c *BitcoindClient) GetBlockHash(height int64) (*chainhash.Hash, error) { + return c.client.GetBlockHash(height) +} + +// GetBlockHeader returns a block header from the hash. +func (c *BitcoindClient) GetBlockHeader( + hash *chainhash.Hash) (*wire.BlockHeader, error) { + return c.client.GetBlockHeader(hash) +} + +// GetBlockHeaderVerbose returns a block header from the hash. +func (c *BitcoindClient) GetBlockHeaderVerbose(hash *chainhash.Hash) ( + *btcjson.GetBlockHeaderVerboseResult, error) { + return c.client.GetBlockHeaderVerbose(hash) +} + +// GetRawTransactionVerbose returns a transaction from the tx hash. +func (c *BitcoindClient) GetRawTransactionVerbose(hash *chainhash.Hash) ( + *btcjson.TxRawResult, error) { + return c.client.GetRawTransactionVerbose(hash) +} + +// GetTxOut returns a txout from the outpoint info provided. +func (c *BitcoindClient) GetTxOut(txHash *chainhash.Hash, index uint32, + mempool bool) (*btcjson.GetTxOutResult, error) { + return c.client.GetTxOut(txHash, index, mempool) +} + +// NotifyReceived updates the watch list with the passed addresses. +func (c *BitcoindClient) NotifyReceived(addrs []btcutil.Address) error { + select { + case c.rescanUpdate <- addrs: + case <-c.quit: + } + return nil +} + +// NotifySpent updates the watch list with the passed outPoints. +func (c *BitcoindClient) NotifySpent(outPoints []*wire.OutPoint) error { + select { + case c.rescanUpdate <- outPoints: + case <-c.quit: + } + return nil +} + +// NotifyTxIDs updates the watch list with the passed TxIDs. +func (c *BitcoindClient) NotifyTxIDs(txids []chainhash.Hash) error { + select { + case c.rescanUpdate <- txids: + case <-c.quit: + } + return nil +} + +// NotifyBlocks is always on. +func (c *BitcoindClient) NotifyBlocks() error { + return nil +} + +// LoadTxFilter updates the transaction watchlists for the client. Acceptable +// arguments after `reset` are any combination of []btcutil.Address, +// []wire.OutPoint, []*wire.OutPoint, []chainhash.Hash, and []*chainhash.Hash. +func (c *BitcoindClient) LoadTxFilter(reset bool, + watchLists ...interface{}) error { + + // If we reset, signal that. + if reset { + select { + case c.rescanUpdate <- reset: + case <-c.quit: + return nil + } + } + + // This helper function will send an update to the filter. If the quit + // channel is closed, it will allow the outer loop below to finish, + // but skip over any updates as the quit case is triggered each time. + sendList := func(list interface{}) { + select { + case c.rescanUpdate <- list: + case <-c.quit: + } + } + + for _, watchList := range watchLists { + switch list := watchList.(type) { + case []wire.OutPoint: + sendList(list) + case []*wire.OutPoint: + sendList(list) + case []btcutil.Address: + sendList(list) + case []chainhash.Hash: + sendList(list) + case []*chainhash.Hash: + sendList(list) + default: + log.Warnf("Couldn't add item to filter: unknown type") + } + } + return nil +} + +// RescanBlocks rescans any blocks passed, returning only the blocks that +// matched as []btcjson.BlockDetails. +func (c *BitcoindClient) RescanBlocks(blockHashes []chainhash.Hash) ( + []btcjson.RescannedBlock, error) { + + rescannedBlocks := make([]btcjson.RescannedBlock, 0, len(blockHashes)) + for _, hash := range blockHashes { + header, err := c.GetBlockHeaderVerbose(&hash) + if err != nil { + log.Warnf("Unable to get header %s from bitcoind: %s", + hash, err) + continue + } + + block, err := c.GetBlock(&hash) + if err != nil { + log.Warnf("Unable to get block %s from bitcoind: %s", + hash, err) + continue + } + + relevantTxes, err := c.filterBlock(block, header.Height, + false) + if len(relevantTxes) > 0 { + rescannedBlock := btcjson.RescannedBlock{ + Hash: hash.String(), + } + for _, tx := range relevantTxes { + rescannedBlock.Transactions = append( + rescannedBlock.Transactions, + hex.EncodeToString(tx.SerializedTx), + ) + } + rescannedBlocks = append(rescannedBlocks, + rescannedBlock) + } + } + return rescannedBlocks, nil +} + +// Rescan rescans from the block with the given hash until the current block, +// after adding the passed addresses and outpoints to the client's watch list. +func (c *BitcoindClient) Rescan(blockHash *chainhash.Hash, + addrs []btcutil.Address, outPoints []*wire.OutPoint) error { + + if blockHash == nil { + return errors.New("rescan requires a starting block hash") + } + + // Update addresses. + select { + case c.rescanUpdate <- addrs: + case <-c.quit: + return nil + } + + // Update outpoints. + select { + case c.rescanUpdate <- outPoints: + case <-c.quit: + return nil + } + + // Kick off the rescan with the starting block hash. + select { + case c.rescanUpdate <- blockHash: + case <-c.quit: + return nil + } + + return nil +} + +// SendRawTransaction sends a raw transaction via bitcoind. +func (c *BitcoindClient) SendRawTransaction(tx *wire.MsgTx, + allowHighFees bool) (*chainhash.Hash, error) { + + return c.client.SendRawTransaction(tx, allowHighFees) +} + +// Start attempts to establish a client connection with the remote server. +// If successful, handler goroutines are started to process notifications +// sent by the server. After a limited number of connection attempts, this +// function gives up, and therefore will not block forever waiting for the +// connection to be established to a server that may not exist. +func (c *BitcoindClient) Start() error { + // Verify that the server is running on the expected network. + net, err := c.GetCurrentNet() + if err != nil { + c.client.Disconnect() + return err + } + if net != c.chainParams.Net { + c.client.Disconnect() + return errors.New("mismatched networks") + } + + c.quitMtx.Lock() + c.started = true + c.quitMtx.Unlock() + + c.wg.Add(2) + go c.handler() + go c.socketHandler() + return nil +} + +// Stop disconnects the client and signals the shutdown of all goroutines +// started by Start. +func (c *BitcoindClient) Stop() { + c.quitMtx.Lock() + select { + case <-c.quit: + default: + close(c.quit) + c.client.Shutdown() + + if !c.started { + close(c.dequeueNotification) + } + } + c.quitMtx.Unlock() +} + +// WaitForShutdown blocks until both the client has finished disconnecting +// and all handlers have exited. +func (c *BitcoindClient) WaitForShutdown() { + c.client.WaitForShutdown() + c.wg.Wait() +} + +// Notifications returns a channel of parsed notifications sent by the remote +// bitcoin RPC server. This channel must be continually read or the process +// may abort for running out memory, as unread notifications are queued for +// later reads. +func (c *BitcoindClient) Notifications() <-chan interface{} { + return c.dequeueNotification +} + +// SetStartTime is a non-interface method to set the birthday of the wallet +// using this object. Since only a single rescan at a time is currently +// supported, only one birthday needs to be set. This does not fully restart a +// running rescan, so should not be used to update a rescan while it is running. +// TODO: When factoring out to multiple rescans per bitcoind client, add a +// birthday per client. +func (c *BitcoindClient) SetStartTime(startTime time.Time) { + c.clientMtx.Lock() + defer c.clientMtx.Unlock() + + c.startTime = startTime +} + +// BlockStamp returns the latest block notified by the client, or an error +// if the client has been shut down. +func (c *BitcoindClient) BlockStamp() (*waddrmgr.BlockStamp, error) { + select { + case bs := <-c.currentBlock: + return bs, nil + case <-c.quit: + return nil, errors.New("disconnected") + } +} + +func (c *BitcoindClient) onClientConnect() { + select { + case c.enqueueNotification <- ClientConnected{}: + case <-c.quit: + } +} + +func (c *BitcoindClient) onBlockConnected(hash *chainhash.Hash, height int32, time time.Time) { + select { + case c.enqueueNotification <- BlockConnected{ + Block: wtxmgr.Block{ + Hash: *hash, + Height: height, + }, + Time: time, + }: + case <-c.quit: + } +} + +func (c *BitcoindClient) onFilteredBlockConnected(height int32, + header *wire.BlockHeader, relevantTxs []*wtxmgr.TxRecord) { + select { + case c.enqueueNotification <- FilteredBlockConnected{ + Block: &wtxmgr.BlockMeta{ + Block: wtxmgr.Block{ + Hash: header.BlockHash(), + Height: height, + }, + Time: header.Timestamp, + }, + RelevantTxs: relevantTxs, + }: + case <-c.quit: + } +} + +func (c *BitcoindClient) onBlockDisconnected(hash *chainhash.Hash, height int32, time time.Time) { + select { + case c.enqueueNotification <- BlockDisconnected{ + Block: wtxmgr.Block{ + Hash: *hash, + Height: height, + }, + Time: time, + }: + case <-c.quit: + } +} + +func (c *BitcoindClient) onRelevantTx(rec *wtxmgr.TxRecord, + block *btcjson.BlockDetails) { + blk, err := parseBlock(block) + if err != nil { + // Log and drop improper notification. + log.Errorf("recvtx notification bad block: %v", err) + return + } + + select { + case c.enqueueNotification <- RelevantTx{rec, blk}: + case <-c.quit: + } +} + +func (c *BitcoindClient) onRescanProgress(hash *chainhash.Hash, height int32, blkTime time.Time) { + select { + case c.enqueueNotification <- &RescanProgress{hash, height, blkTime}: + case <-c.quit: + } +} + +func (c *BitcoindClient) onRescanFinished(hash *chainhash.Hash, height int32, blkTime time.Time) { + log.Infof("Rescan finished at %d (%s)", height, hash) + select { + case c.enqueueNotification <- &RescanFinished{hash, height, blkTime}: + case <-c.quit: + } + +} + +// socketHandler reads events from the ZMQ socket, processes them as +// appropriate, and queues them as btcd or neutrino would. +func (c *BitcoindClient) socketHandler() { + defer c.wg.Done() + + // Connect a ZMQ socket for block notifications + zmqCtx, err := zmq4.NewContext() + if err != nil { + log.Error(err) + return + } + defer func() { + if zmqCtx != nil { + err = zmqCtx.Term() + log.Infof("ZMQ context terminated: %v\n", err) + } + }() + zmqClient, err := zmqCtx.NewSocket(zmq4.SUB) + if err != nil { + log.Error(err) + return + } + defer func() { + if zmqClient != nil { + err = zmqClient.Close() + log.Infof("ZMQ socket closed: %v\n", err) + } + }() + err = zmqClient.SetSubscribe("rawblock") + if err != nil { + log.Error(err) + return + } + err = zmqClient.SetSubscribe("rawtx") + if err != nil { + log.Error(err) + return + } + err = zmqClient.SetRcvtimeo(c.zmqPollInterval) + err = zmqClient.Connect(c.zmqConnect) + if err != nil { + log.Error(err) + return + } + log.Infof("Started listening for blocks via ZMQ on %s", c.zmqConnect) + c.onClientConnect() + + // Get initial conditions. + bs, err := c.BlockStamp() + if err != nil { + log.Error(err) + return + } + +mainLoop: + for { + selectLoop: + for { + // Check for any requests before we poll events from + // bitcoind. + select { + + // Quit if requested + case <-c.quit: + return + + // Update our monitored watchlists or do a rescan. + case event := <-c.rescanUpdate: + switch e := event.(type) { + case struct{}: + // We're clearing the watchlists. + c.clientMtx.Lock() + c.watchAddrs = make(map[string]struct{}) + c.watchTxIDs = make(map[chainhash.Hash]struct{}) + c.watchOutPoints = + make(map[wire.OutPoint]struct{}) + c.clientMtx.Unlock() + case []btcutil.Address: + // We're updating monitored addresses. + c.clientMtx.Lock() + for _, addr := range e { + c.watchAddrs[addr.EncodeAddress()] = + struct{}{} + } + c.clientMtx.Unlock() + case []*wire.OutPoint: + // We're updating monitored outpoints + // from pointers. + c.clientMtx.Lock() + for _, op := range e { + c.watchOutPoints[*op] = struct{}{} + } + c.clientMtx.Unlock() + case []wire.OutPoint: + // We're updating monitored outpoints. + c.clientMtx.Lock() + for _, op := range e { + c.watchOutPoints[op] = struct{}{} + } + c.clientMtx.Unlock() + case []*chainhash.Hash: + // We're adding monitored TXIDs from + // pointers. + c.clientMtx.Lock() + for _, txid := range e { + c.watchTxIDs[*txid] = struct{}{} + } + c.clientMtx.Unlock() + case []chainhash.Hash: + // We're adding monitored TXIDs. + c.clientMtx.Lock() + for _, txid := range e { + c.watchTxIDs[txid] = struct{}{} + } + c.clientMtx.Unlock() + case *chainhash.Hash: + // We're rescanning from the passed + // hash. + err = c.rescan(e) + if err != nil { + log.Errorf("rescan failed: %s", + err) + } + } + default: + break selectLoop + } + } + + // Now, poll events from bitcoind. + msgBytes, err := zmqClient.RecvMessageBytes(0) + if err == zmq4.Errno(0xb) { // EAGAIN - timeout on recv + continue mainLoop + } + if err != nil { + log.Error(err) + continue mainLoop + } + + // We have an event! + switch string(msgBytes[0]) { + + // We have a transaction, so process it. + case "rawtx": + tx := &wire.MsgTx{} + err = tx.Deserialize(bytes.NewBuffer(msgBytes[1])) + if err != nil { + log.Error(err) + continue mainLoop + } + // filterTx automatically detects whether this tx has + // been mined and responds appropriately. + _, _, err := c.filterTx(tx, nil, true) + if err != nil { + log.Error(err) + } + + // We have a raw block, so we process it. + case "rawblock": + block := &wire.MsgBlock{} + err = block.Deserialize(bytes.NewBuffer(msgBytes[1])) + if err != nil { + log.Error(err) + continue mainLoop + } + + // Check if the block is logically next. If not, we + // have a reorg. + if block.Header.PrevBlock == bs.Hash { + // No reorg. Notify the subscriber of the block. + bs.Hash = block.BlockHash() + bs.Height++ + _, err = c.filterBlock(block, bs.Height, true) + if err != nil { + log.Error(err) + } + continue mainLoop + } + + // We have a reorg. + err = c.reorg(bs, block) + if err != nil { + log.Errorf("Error during reorg: %v", err) + } + + // Our event is not a block or other type we're + // watching, so we ignore it. + default: + } + } +} + +// reorg processes a reorganization during chain synchronization. This is +// separate from a rescan's handling of a reorg. +func (c *BitcoindClient) reorg(bs *waddrmgr.BlockStamp, block *wire.MsgBlock) error { + // We rewind until we find a common ancestor between the known chain + //and the current chain, and then fast forward again. This relies on + // being able to fetch both from bitcoind; to change that would require + // changes in downstream code. + // TODO: Make this more robust in order not to rely on this behavior. + log.Infof("Possible reorg at block %s", block.BlockHash()) + knownHeader, err := c.GetBlockHeader(&bs.Hash) + if err != nil { + return err + } + + // We also get the best known height based on the block which was + // notified. This way, we can preserve the chain of blocks we need to + // retrieve. + bestHash := block.BlockHash() + bestHeight, err := c.GetBlockHeight(&bestHash) + if err != nil { + return err + } + if bestHeight < bs.Height { + log.Warn("multiple reorgs in a row") + return nil + } + + // We track the block headers from the notified block to the current + // block at the known block height. This will let us fast-forward + // despite any future reorgs. + var reorgBlocks list.List + reorgBlocks.PushFront(block) + for i := bestHeight - 1; i >= bs.Height; i-- { + block, err = c.GetBlock(&block.Header.PrevBlock) + if err != nil { + return err + } + reorgBlocks.PushFront(block) + } + + // Now we rewind back to the last common ancestor block, using the + // prevblock hash from each header to avoid any race conditions. If we + // get more reorgs, they'll be queued and we'll repeat the cycle. + for block.Header.PrevBlock != knownHeader.PrevBlock { + log.Debugf("Disconnecting block %d (%s)", bs.Height, bs.Hash) + c.onBlockDisconnected(&bs.Hash, bs.Height, + knownHeader.Timestamp) + bs.Height-- + bs.Hash = knownHeader.PrevBlock + block, err = c.GetBlock(&block.Header.PrevBlock) + if err != nil { + return err + } + reorgBlocks.PushFront(block) + knownHeader, err = c.GetBlockHeader(&knownHeader.PrevBlock) + if err != nil { + return err + } + } + + // Disconnect the last block from the old chain. Since the PrevBlock is + // equal between the old and new chains, the tip will now be the last + // common ancestor. + log.Debugf("Disconnecting block %d (%s)", bs.Height, bs.Hash) + c.onBlockDisconnected(&bs.Hash, bs.Height, knownHeader.Timestamp) + bs.Height-- + + // Now we fast-forward to the notified block, notifying along the way. + for reorgBlocks.Front() != nil { + block = reorgBlocks.Front().Value.(*wire.MsgBlock) + bs.Height++ + bs.Hash = block.BlockHash() + c.filterBlock(block, bs.Height, true) + reorgBlocks.Remove(reorgBlocks.Front()) + } + + return nil +} + +// rescan performs a rescan of the chain using a bitcoind back-end, from the +// specified hash to the best-known hash, while watching out for reorgs that +// happen during the rescan. It uses the addresses and outputs being tracked +// by the client in the watch list. This is called only within a queue +// processing loop. +func (c *BitcoindClient) rescan(hash *chainhash.Hash) error { + // We start by getting the best already-processed block. We only use + // the height, as the hash can change during a reorganization, which we + // catch by testing connectivity from known blocks to the previous + // block. + log.Infof("Starting rescan from block %s", hash) + bestBlock, err := c.BlockStamp() + if err != nil { + return err + } + lastHeader, err := c.GetBlockHeaderVerbose(hash) + if err != nil { + return err + } + lastHash, err := chainhash.NewHashFromStr(lastHeader.Hash) + if err != nil { + return err + } + firstHeader := lastHeader + + headers := list.New() + headers.PushBack(lastHeader) + + // We always send a RescanFinished message when we're done. + defer c.onRescanFinished(lastHash, lastHeader.Height, time.Unix( + lastHeader.Time, 0)) + + // Cycle through all of the blocks known to bitcoind, being mindful of + // reorgs. + for i := firstHeader.Height + 1; i <= bestBlock.Height; i++ { + // Get the block at the current height. + hash, err := c.GetBlockHash(int64(i)) + if err != nil { + return err + } + + // This relies on the fact that bitcoind returns blocks from + // non-best chains it knows about. + // TODO: Make this more robust in order to not rely on this + // behavior. + // + // If the last known header isn't after the wallet birthday, + // try only fetching the next header and constructing a dummy + // block. If, in this event, the next header's timestamp is + // after the wallet birthday, go ahead and fetch the full block. + var block *wire.MsgBlock + c.clientMtx.RLock() + afterBirthday := lastHeader.Time >= c.startTime.Unix() + c.clientMtx.RUnlock() + if !afterBirthday { + header, err := c.GetBlockHeader(hash) + if err != nil { + return err + } + block = &wire.MsgBlock{ + Header: *header, + } + c.clientMtx.RLock() + afterBirthday = c.startTime.Before(header.Timestamp) + if afterBirthday { + c.onRescanProgress(lastHash, i, + block.Header.Timestamp) + } + c.clientMtx.RUnlock() + } + + if afterBirthday { + block, err = c.GetBlock(hash) + if err != nil { + return err + } + } + + if block.Header.PrevBlock.String() != lastHeader.Hash { + // We've been reorganized, maybe. We now walk backwards + // to a known block. If we go back past the passed + // block, we return an error. The initialization logic + // of the wallet should prevent that from happening. + for j := i - 1; j > firstHeader.Height; j-- { + hash, err = c.GetBlockHash(int64(j)) + if err != nil { + return err + } + + // If we've found a matching hash, we can move + // forward from there. + if hash.String() == lastHeader.Hash { + i = j + 1 + block, err = c.GetBlock(hash) + if err != nil { + return err + } + break + } + + // Rewind the rescan state. + c.onBlockDisconnected(lastHash, + lastHeader.Height, + time.Unix(lastHeader.Time, 0)) + headers.Remove(headers.Back()) + lastHeader = headers.Back().Value.(*btcjson. + GetBlockHeaderVerboseResult) + lastHash, err = chainhash.NewHashFromStr( + lastHeader.Hash) + if err != nil { + return err + } + } + + // Check again and make sure we're at the start of the + // reorg. + if block.Header.PrevBlock.String() != lastHeader.Hash { + return errors.New("reorg during rescan went " + + "too far back") + } + } + + // We are at the latest known block, so we notify. + lastHeader = &btcjson.GetBlockHeaderVerboseResult{ + Hash: block.BlockHash().String(), + Height: i, + PreviousHash: block.Header.PrevBlock.String(), + Time: block.Header.Timestamp.Unix(), + } + blockHash := block.BlockHash() + lastHash = &blockHash + headers.PushBack(lastHeader) + + _, err = c.filterBlock(block, i, true) + if err != nil { + return err + } + + if i%10000 == 0 { + c.onRescanProgress(lastHash, i, block.Header.Timestamp) + } + } + + return nil +} + +// filterBlock filters a block for watched outpoints and addresses, and returns +// any matching transactions, sending notifications along the way. +func (c *BitcoindClient) filterBlock(block *wire.MsgBlock, height int32, + notify bool) ([]*wtxmgr.TxRecord, error) { + // If we're earlier than wallet birthday, don't do any notifications. + c.clientMtx.RLock() + startTime := c.startTime + c.clientMtx.RUnlock() + if block.Header.Timestamp.Before(startTime) { + return nil, nil + } + + log.Debugf("Filtering block %d (%s) with %d transactions", height, + block.BlockHash(), len(block.Transactions)) + // Create block details for notifications. + blockHash := block.BlockHash() + blockDetails := &btcjson.BlockDetails{ + Hash: blockHash.String(), + Height: height, + Time: block.Header.Timestamp.Unix(), + } + + // Cycle through all transactions in the block. + var relevantTxs []*wtxmgr.TxRecord + blockConfirmed := make(map[chainhash.Hash]struct{}) + for i, tx := range block.Transactions { + // Update block and tx details for notifications. + blockDetails.Index = i + found, rec, err := c.filterTx(tx, blockDetails, notify) + if err != nil { + log.Warnf("Unable to filter tx: %v", err) + continue + } + if found { + relevantTxs = append(relevantTxs, rec) + blockConfirmed[tx.TxHash()] = struct{}{} + } + } + + // Update the expiration map by setting the block's confirmed + // transactions and deleting any in the mempool that were confirmed + // over 288 blocks ago. + c.clientMtx.Lock() + c.memPoolExp[height] = blockConfirmed + if oldBlock, ok := c.memPoolExp[height-288]; ok { + for txHash := range oldBlock { + delete(c.memPool, txHash) + } + delete(c.memPoolExp, height-288) + } + c.clientMtx.Unlock() + + if notify { + c.onFilteredBlockConnected(height, &block.Header, relevantTxs) + c.onBlockConnected(&blockHash, height, block.Header.Timestamp) + } + + return relevantTxs, nil +} + +// filterTx filters a single transaction against the client's watch list. +func (c *BitcoindClient) filterTx(tx *wire.MsgTx, + blockDetails *btcjson.BlockDetails, notify bool) (bool, + *wtxmgr.TxRecord, error) { + + txDetails := btcutil.NewTx(tx) + if blockDetails != nil { + txDetails.SetIndex(blockDetails.Index) + } + + rec, err := wtxmgr.NewTxRecordFromMsgTx(txDetails.MsgTx(), time.Now()) + if err != nil { + log.Errorf("Cannot create transaction record for relevant "+ + "tx: %v", err) + return false, nil, err + } + if blockDetails != nil { + rec.Received = time.Unix(blockDetails.Time, 0) + } + + var notifyTx bool + + // If we already know this is a relevant tx from a previous ntfn, we + // can shortcut the filter process and let the caller know the filter + // matches. + c.clientMtx.RLock() + if _, ok := c.memPool[tx.TxHash()]; ok { + c.clientMtx.RUnlock() + if notify && blockDetails != nil { + c.onRelevantTx(rec, blockDetails) + } + return true, rec, nil + } + c.clientMtx.RUnlock() + + // Cycle through outputs and check if we've matched a known address. + // Add any matched outpoints to watchOutPoints. + for i, out := range tx.TxOut { + _, addrs, _, err := txscript.ExtractPkScriptAddrs( + out.PkScript, c.chainParams) + if err != nil { + log.Debugf("Couldn't parse output script in %s:%d: %v", + tx.TxHash(), i, err) + continue + } + for _, addr := range addrs { + c.clientMtx.RLock() + if _, ok := c.watchAddrs[addr.EncodeAddress()]; ok { + notifyTx = true + c.watchOutPoints[wire.OutPoint{ + Hash: tx.TxHash(), + Index: uint32(i), + }] = struct{}{} + } + c.clientMtx.RUnlock() + } + } + + // If an output hasn't already matched, see if an input will. + if !notifyTx { + for _, in := range tx.TxIn { + c.clientMtx.RLock() + if _, ok := c.watchOutPoints[in.PreviousOutPoint]; ok { + c.clientMtx.RUnlock() + notifyTx = true + break + } + c.clientMtx.RUnlock() + } + } + + // If we have a match and it's not mined, notify the TX. If the TX is + // mined, we notify as part of FilteredBlockConnected. The boolean map + // value will let us know if we last saw it as mined or unmined. + if notifyTx { + c.clientMtx.Lock() + if _, ok := c.memPool[tx.TxHash()]; blockDetails == nil || !ok { + c.onRelevantTx(rec, blockDetails) + } + c.memPool[tx.TxHash()] = struct{}{} + c.clientMtx.Unlock() + } + + return notifyTx, rec, nil +} + +// handler maintains a queue of notifications and the current state (best +// block) of the chain. +func (c *BitcoindClient) handler() { + hash, height, err := c.GetBestBlock() + if err != nil { + log.Errorf("Failed to receive best block from chain server: %v", err) + c.Stop() + c.wg.Done() + return + } + + bs := &waddrmgr.BlockStamp{Hash: *hash, Height: height} + + // TODO: Rather than leaving this as an unbounded queue for all types of + // notifications, try dropping ones where a later enqueued notification + // can fully invalidate one waiting to be processed. For example, + // blockconnected notifications for greater block heights can remove the + // need to process earlier blockconnected notifications still waiting + // here. + + // TODO(aakselrod): Factor this logic out so it can be reused for each + // chain back end, rather than copying it. + + var notifications []interface{} + enqueue := c.enqueueNotification + var dequeue chan interface{} + var next interface{} +out: + for { + select { + case n, ok := <-enqueue: + if !ok { + // If no notifications are queued for handling, + // the queue is finished. + if len(notifications) == 0 { + break out + } + // nil channel so no more reads can occur. + enqueue = nil + continue + } + if len(notifications) == 0 { + next = n + dequeue = c.dequeueNotification + } + notifications = append(notifications, n) + + case dequeue <- next: + if n, ok := next.(BlockConnected); ok { + bs = &waddrmgr.BlockStamp{ + Height: n.Height, + Hash: n.Hash, + } + } + + notifications[0] = nil + notifications = notifications[1:] + if len(notifications) != 0 { + next = notifications[0] + } else { + // If no more notifications can be enqueued, the + // queue is finished. + if enqueue == nil { + break out + } + dequeue = nil + } + + case c.currentBlock <- bs: + + case <-c.quit: + break out + } + } + + c.Stop() + close(c.dequeueNotification) + c.wg.Done() +} diff --git a/chain/interface.go b/chain/interface.go index 1608d4d..d2fb82b 100644 --- a/chain/interface.go +++ b/chain/interface.go @@ -10,6 +10,16 @@ import ( "github.com/roasbeef/btcwallet/wtxmgr" ) +// BackEnds returns a list of the available back ends. +// TODO: Refactor each into a driver and use dynamic registration. +func BackEnds() []string { + return []string{ + "bitcoind", + "btcd", + "neutrino", + } +} + // Interface allows more than one backing blockchain source, such as a // btcd RPC chain server, or an SPV library, as long as we write a driver for // it. @@ -27,6 +37,7 @@ type Interface interface { NotifyReceived([]btcutil.Address) error NotifyBlocks() error Notifications() <-chan interface{} + BackEnd() string } // Notification types. These are defined here and processed from from reading diff --git a/chain/neutrino.go b/chain/neutrino.go index a82bc44..0803e4d 100644 --- a/chain/neutrino.go +++ b/chain/neutrino.go @@ -46,6 +46,11 @@ func NewNeutrinoClient(chainService *neutrino.ChainService) *NeutrinoClient { return &NeutrinoClient{CS: chainService} } +// BackEnd returns the name of the driver. +func (s *NeutrinoClient) BackEnd() string { + return "neutrino" +} + // Start replicates the RPC client's Start method. func (s *NeutrinoClient) Start() error { s.CS.Start() diff --git a/chain/rpc.go b/chain/rpc.go index 2c5f9f8..fc40456 100644 --- a/chain/rpc.go +++ b/chain/rpc.go @@ -84,6 +84,11 @@ func NewRPCClient(chainParams *chaincfg.Params, connect, user, pass string, cert return client, nil } +// BackEnd returns the name of the driver. +func (c *RPCClient) BackEnd() string { + return "btcd" +} + // Start attempts to establish a client connection with the remote server. // If successful, handler goroutines are started to process notifications // sent by the server. After a limited number of connection attempts, this diff --git a/glide.lock b/glide.lock index 9656174..0905a90 100644 --- a/glide.lock +++ b/glide.lock @@ -9,6 +9,7 @@ <<<<<<< HEAD <<<<<<< HEAD <<<<<<< HEAD +<<<<<<< HEAD hash: 2fe59efc96b0a2839297653da88cde89208f8f8cf4ced2bb1e828def57e3611b updated: 2017-07-19T11:33:58.0769452-04:00 ======= @@ -54,6 +55,10 @@ updated: 2017-11-03T12:30:39.76652975-07:00 hash: 8217b51c2a5fc4df53200de6315f4c1f4a084a516f38c77b815e0a02ac71e332 updated: 2017-11-18T15:17:20.95646706-08:00 >>>>>>> dc5f652... build: update to latest btcd and neutrino +======= +hash: 83bb1c0a0f5c6396d2387746544592662898af52596d5cdca898b6fbfaa60841 +updated: 2017-11-29T14:42:36.82153259-06:00 +>>>>>>> 7a32017... multi: add bitcoind back-end (only for API use for now) imports: - name: github.com/aead/siphash version: e404fcfc888570cadd1610538e2dbc89f66af814 @@ -174,6 +179,8 @@ imports: subpackages: - filterdb - headerfs +- name: github.com/pebbe/zmq4 + version: 90d69e412a09549f2e90bac70fbb449081f1e5c1 - name: github.com/roasbeef/btcd version: 5d9e4e1fa749fa2f1675802a4c9f10ef0ada04d1 subpackages: diff --git a/glide.yaml b/glide.yaml index 81f6e20..897c4a6 100644 --- a/glide.yaml +++ b/glide.yaml @@ -43,6 +43,7 @@ import: - package: github.com/jrick/logrotate subpackages: - rotator +- package: github.com/pebbe/zmq4 testImport: - package: github.com/davecgh/go-spew subpackages: diff --git a/wallet/notifications.go b/wallet/notifications.go index dcbf798..b63ba3a 100644 --- a/wallet/notifications.go +++ b/wallet/notifications.go @@ -198,7 +198,8 @@ func (s *NotificationServer) notifyUnminedTransaction(dbtx walletdb.ReadTx, deta // Sanity check: should not be currently coalescing a notification for // mined transactions at the same time that an unmined tx is notified. if s.currentTxNtfn != nil { - log.Errorf("Notifying unmined tx notification while creating notification for blocks") + log.Errorf("Notifying unmined tx notification (%s) while creating notification for blocks", + details.Hash) } defer s.mu.Unlock() diff --git a/wallet/wallet.go b/wallet/wallet.go index ca0c3f5..239ef45 100644 --- a/wallet/wallet.go +++ b/wallet/wallet.go @@ -163,6 +163,8 @@ func (w *Wallet) SynchronizeRPC(chainClient chain.Interface) { switch cc := chainClient.(type) { case *chain.NeutrinoClient: cc.SetStartTime(w.Manager.Birthday()) + case *chain.BitcoindClient: + cc.SetStartTime(w.Manager.Birthday()) } w.chainClientLock.Unlock() @@ -1436,6 +1438,12 @@ func (w *Wallet) GetTransactions(startBlock, endBlock *BlockIdentifier, cancel < switch client := chainClient.(type) { case *chain.RPCClient: startResp = client.GetBlockVerboseTxAsync(startBlock.hash) + case *chain.BitcoindClient: + var err error + start, err = client.GetBlockHeight(startBlock.hash) + if err != nil { + return nil, err + } case *chain.NeutrinoClient: var err error start, err = client.GetBlockHeight(startBlock.hash)