diff --git a/account.go b/account.go index b34b563..0b38a40 100644 --- a/account.go +++ b/account.go @@ -644,6 +644,11 @@ func (a *Account) newBlockTxOutHandler(result interface{}, e *btcjson.Error) boo log.Error("Tx Handler: Unspecified receiver.") return false } + receiverHash, _, err := btcutil.DecodeAddress(receiver) + if err != nil { + log.Errorf("Tx Handler: receiver address can not be decoded: %v", err) + return false + } height, ok := v["height"].(float64) if !ok { log.Error("Tx Handler: Unspecified height.") @@ -654,6 +659,11 @@ func (a *Account) newBlockTxOutHandler(result interface{}, e *btcjson.Error) boo log.Error("Tx Handler: Unspecified block hash.") return false } + blockHash, err := btcwire.NewShaHashFromStr(blockHashBE) + if err != nil { + log.Errorf("Tx Handler: Block hash string cannot be parsed: %v", err) + return false + } fblockIndex, ok := v["blockindex"].(float64) if !ok { log.Error("Tx Handler: Unspecified block index.") @@ -671,12 +681,17 @@ func (a *Account) newBlockTxOutHandler(result interface{}, e *btcjson.Error) boo log.Error("Tx Handler: Unspecified transaction hash.") return false } + txID, err := btcwire.NewShaHashFromStr(txhashBE) + if err != nil { + log.Errorf("Tx Handler: Tx hash string cannot be parsed: %v", err) + return false + } ftxOutIndex, ok := v["txoutindex"].(float64) if !ok { log.Error("Tx Handler: Unspecified transaction output index.") return false } - txOutIndex := int32(ftxOutIndex) + txOutIndex := uint32(ftxOutIndex) amt, ok := v["amount"].(float64) if !ok { log.Error("Tx Handler: Unspecified amount.") @@ -693,30 +708,22 @@ func (a *Account) newBlockTxOutHandler(result interface{}, e *btcjson.Error) boo spent = tspent } - // btcd sends the block and tx hashes as BE strings. Convert both - // to a LE ShaHash. - blockHash, err := btcwire.NewShaHashFromStr(blockHashBE) - if err != nil { - log.Errorf("Tx Handler: Block hash string cannot be parsed: %v", err) - return false - } - txID, err := btcwire.NewShaHashFromStr(txhashBE) - if err != nil { - log.Errorf("Tx Handler: Tx hash string cannot be parsed: %v", err) - return false - } - receiverHash, _, err := btcutil.DecodeAddress(receiver) - if err != nil { - log.Errorf("Tx Handler: receiver address can not be decoded: %v", err) - return false + if int32(height) != -1 { + worker := NotifyBalanceWorker{ + block: *blockHash, + wg: make(chan *sync.WaitGroup), + } + NotifyBalanceSyncerChans.add <- worker + wg := <-worker.wg + defer func() { + wg.Done() + }() } - // Add to TxStore. - // - // TODO(jrick): check for duplicates. This could occur if we're - // adding txs for an out of sync btcd on its IBD. + // Create RecvTx to add to tx history. t := &tx.RecvTx{ TxID: *txID, + TxOutIdx: txOutIndex, TimeReceived: time.Now().Unix(), BlockHeight: int32(height), BlockHash: *blockHash, @@ -726,44 +733,49 @@ func (a *Account) newBlockTxOutHandler(result interface{}, e *btcjson.Error) boo ReceiverHash: receiverHash, } + // For transactions originating from this wallet, the sent tx history should + // be recorded before the received history. If wallet created this tx, wait + // for the sent history to finish being recorded before continuing. + req := SendTxHistSyncRequest{ + txid: *txID, + response: make(chan SendTxHistSyncResponse), + } + SendTxHistSyncChans.access <- req + resp := <-req.response + if resp.ok { + // Wait until send history has been recorded. + <-resp.c + SendTxHistSyncChans.remove <- *txID + } + + // Actually record the tx history. a.TxStore.Lock() - txs := a.TxStore.s - a.TxStore.s = append(txs, t) + a.TxStore.s.InsertRecvTx(t) a.TxStore.dirty = true a.TxStore.Unlock() - // Notify frontends of new tx. - NotifyNewTxDetails(frontendNotificationMaster, a.Name(), t.TxInfo(a.Name(), - int32(height), a.Wallet.Net())) + // Notify frontends of tx. If the tx is unconfirmed, it is always + // notified and the outpoint is marked as notified. If the outpoint + // has already been notified and is now in a block, a txmined notifiction + // should be sent once to let frontends that all previous send/recvs + // for this unconfirmed tx are now confirmed. + recvTxOP := btcwire.NewOutPoint(txID, txOutIndex) + previouslyNotifiedReq := NotifiedRecvTxRequest{ + op: *recvTxOP, + response: make(chan NotifiedRecvTxResponse), + } + NotifiedRecvTxChans.access <- previouslyNotifiedReq + if <-previouslyNotifiedReq.response { + NotifyMinedTx <- t + NotifiedRecvTxChans.remove <- *recvTxOP + } else { + // Notify frontends of new recv tx and mark as notified. + NotifiedRecvTxChans.add <- *recvTxOP + NotifyNewTxDetails(frontendNotificationMaster, a.Name(), t.TxInfo(a.Name(), + int32(height), a.Wallet.Net())) + } if !spent { - // First, iterate through all stored utxos. If an unconfirmed utxo - // (not present in a block) has the same outpoint as this utxo, - // update the block height and hash. - a.UtxoStore.RLock() - for _, u := range a.UtxoStore.s { - if bytes.Equal(u.Out.Hash[:], txID[:]) && u.Out.Index == uint32(txOutIndex) { - // Found a either a duplicate, or a change UTXO. If not change, - // ignore it. - a.UtxoStore.RUnlock() - if u.Height != -1 { - return false - } - - a.UtxoStore.Lock() - copy(u.BlockHash[:], blockHash[:]) - u.Height = int32(height) - a.UtxoStore.dirty = true - a.UtxoStore.Unlock() - - return false - } - } - a.UtxoStore.RUnlock() - - // After iterating through all UTXOs, it was not a duplicate or - // change UTXO appearing in a block. Append a new Utxo to the end. - u := &tx.Utxo{ Amt: uint64(amt), Height: int32(height), @@ -774,13 +786,18 @@ func (a *Account) newBlockTxOutHandler(result interface{}, e *btcjson.Error) boo copy(u.AddrHash[:], receiverHash) copy(u.BlockHash[:], blockHash[:]) a.UtxoStore.Lock() - a.UtxoStore.s = append(a.UtxoStore.s, u) + a.UtxoStore.s.Insert(u) a.UtxoStore.dirty = true a.UtxoStore.Unlock() - // If this notification came from mempool (TODO: currently - // unimplemented) notify the new unconfirmed balance immediately. - // Otherwise, wait until the blockconnection notifiation is processed. + // If this notification came from mempool, notify frontends of + // the new unconfirmed balance immediately. Otherwise, wait until + // the blockconnected notifiation is processed. + if u.Height == -1 { + bal := a.CalculateBalance(0) - a.CalculateBalance(1) + NotifyWalletBalanceUnconfirmed(frontendNotificationMaster, + a.name, bal) + } } // Never remove this handler. diff --git a/cmd.go b/cmd.go index 847e964..ba64275 100644 --- a/cmd.go +++ b/cmd.go @@ -239,6 +239,19 @@ func main() { // Begin generating new IDs for JSON calls. go JSONIDGenerator(NewJSONID) + // Begin maintanence goroutines. + go SendBeforeReceiveHistorySync(SendTxHistSyncChans.add, + SendTxHistSyncChans.done, + SendTxHistSyncChans.remove, + SendTxHistSyncChans.access) + go StoreNotifiedMempoolRecvTxs(NotifiedRecvTxChans.add, + NotifiedRecvTxChans.remove, + NotifiedRecvTxChans.access) + go NotifyMinedTxSender(NotifyMinedTx) + go NotifyBalanceSyncer(NotifyBalanceSyncerChans.add, + NotifyBalanceSyncerChans.remove, + NotifyBalanceSyncerChans.access) + for { replies := make(chan error) done := make(chan int) diff --git a/cmdmgr.go b/cmdmgr.go index 63e577c..83ec6b0 100644 --- a/cmdmgr.go +++ b/cmdmgr.go @@ -27,6 +27,7 @@ import ( "github.com/conformal/btcwallet/wallet" "github.com/conformal/btcwire" "github.com/conformal/btcws" + "sync" "time" ) @@ -636,7 +637,7 @@ func SendFrom(frontend chan []byte, icmd btcjson.Cmd) { } // If a change address was added, mark wallet as dirty, sync to disk, - // and Request updates for change address. + // and request updates for change address. if len(createdTx.changeAddr) != 0 { a.dirty = true if err := a.writeDirtyToDisk(); err != nil { @@ -751,6 +752,10 @@ func SendMany(frontend chan []byte, icmd btcjson.Cmd) { return } + // Mark txid as having send history so handlers adding receive history + // wait until all send history has been written. + SendTxHistSyncChans.add <- createdTx.txid + // Set up a reply handler to respond to the btcd reply. replyHandlers.Lock() replyHandlers.m[n] = func(result interface{}, err *btcjson.Error) bool { @@ -763,6 +768,61 @@ func SendMany(frontend chan []byte, icmd btcjson.Cmd) { btcdMsgs <- m } +// Channels to manage SendBeforeReceiveHistorySync. +var SendTxHistSyncChans = struct { + add, done, remove chan btcwire.ShaHash + access chan SendTxHistSyncRequest +}{ + add: make(chan btcwire.ShaHash), + remove: make(chan btcwire.ShaHash), + done: make(chan btcwire.ShaHash), + access: make(chan SendTxHistSyncRequest), +} + +// SendTxHistSyncRequest requests a SendTxHistSyncResponse from +// SendBeforeReceiveHistorySync. +type SendTxHistSyncRequest struct { + txid btcwire.ShaHash + response chan SendTxHistSyncResponse +} + +// SendTxHistSyncResponse is the response +type SendTxHistSyncResponse struct { + c chan struct{} + ok bool +} + +// SendBeforeReceiveHistorySync manages a set of transaction hashes +// created by this wallet. For each newly added txid, a channel is +// created. Once the send history has been recorded, the txid should +// be messaged across done, causing the internal channel to be closed. +// Before receive history is recorded, access should be used to check +// if there are or were any goroutines writing send history, and if +// so, wait until the channel is closed after a done message. +func SendBeforeReceiveHistorySync(add, done, remove chan btcwire.ShaHash, + access chan SendTxHistSyncRequest) { + + m := make(map[btcwire.ShaHash]chan struct{}) + for { + select { + case txid := <-add: + m[txid] = make(chan struct{}) + + case txid := <-remove: + delete(m, txid) + + case txid := <-done: + if c, ok := m[txid]; ok { + close(c) + } + + case req := <-access: + c, ok := m[req.txid] + req.response <- SendTxHistSyncResponse{c: c, ok: ok} + } + } +} + func handleSendRawTxReply(frontend chan []byte, icmd btcjson.Cmd, result interface{}, e *btcjson.Error, a *Account, txInfo *CreatedTx) bool { @@ -770,6 +830,7 @@ func handleSendRawTxReply(frontend chan []byte, icmd btcjson.Cmd, if e != nil { log.Errorf("Could not send tx: %v", e.Message) ReplyError(frontend, icmd.Id(), e) + SendTxHistSyncChans.remove <- txInfo.txid return true } @@ -780,6 +841,7 @@ func handleSendRawTxReply(frontend chan []byte, icmd btcjson.Cmd, Message: "Unexpected type from btcd reply", } ReplyError(frontend, icmd.Id(), e) + SendTxHistSyncChans.remove <- txInfo.txid return true } txID, err := btcwire.NewShaHashFromStr(txIDStr) @@ -789,6 +851,7 @@ func handleSendRawTxReply(frontend chan []byte, icmd btcjson.Cmd, Message: "Invalid hash string from btcd reply", } ReplyError(frontend, icmd.Id(), e) + SendTxHistSyncChans.remove <- txInfo.txid return true } @@ -814,17 +877,13 @@ func handleSendRawTxReply(frontend chan []byte, icmd btcjson.Cmd, } } + // Signal that received notifiations are ok to add now. + SendTxHistSyncChans.done <- txInfo.txid + // Remove previous unspent outputs now spent by the tx. a.UtxoStore.Lock() modified := a.UtxoStore.s.Remove(txInfo.inputs) a.UtxoStore.dirty = a.UtxoStore.dirty || modified - - // Add unconfirmed change utxo (if any) to UtxoStore. - if txInfo.changeUtxo != nil { - a.UtxoStore.s = append(a.UtxoStore.s, txInfo.changeUtxo) - a.ReqSpentUtxoNtfn(txInfo.changeUtxo) - a.UtxoStore.dirty = true - } a.UtxoStore.Unlock() // Disk sync tx and utxo stores. @@ -1111,3 +1170,140 @@ func NotifyNewTxDetails(frontend chan []byte, account string, mntfn, _ := ntfn.MarshalJSON() frontend <- mntfn } + +// NotifiedRecvTxRequest is used to check whether the outpoint of +// a received transaction has already been notified due to +// arriving first in the btcd mempool. +type NotifiedRecvTxRequest struct { + op btcwire.OutPoint + response chan NotifiedRecvTxResponse +} + +// NotifiedRecvTxResponse is the response of a NotifiedRecvTxRequest +// request. +type NotifiedRecvTxResponse bool + +// NotifiedRecvTxChans holds the channels to manage +// StoreNotifiedMempoolTxs. +var NotifiedRecvTxChans = struct { + add, remove chan btcwire.OutPoint + access chan NotifiedRecvTxRequest +}{ + add: make(chan btcwire.OutPoint), + remove: make(chan btcwire.OutPoint), + access: make(chan NotifiedRecvTxRequest), +} + +// StoreNotifiedMempoolRecvTxs maintains a set of previously-sent +// received transaction notifications originating from the btcd +// mempool. This is used to prevent duplicate frontend transaction +// notifications once a mempool tx is mined into a block. +func StoreNotifiedMempoolRecvTxs(add, remove chan btcwire.OutPoint, + access chan NotifiedRecvTxRequest) { + + m := make(map[btcwire.OutPoint]struct{}) + for { + select { + case op := <-add: + m[op] = struct{}{} + + case op := <-remove: + if _, ok := m[op]; ok { + delete(m, op) + } + + case req := <-access: + _, ok := m[req.op] + req.response <- NotifiedRecvTxResponse(ok) + } + } +} + +// Channel to send received transactions that were previously +// notified to frontends by the mempool. A TxMined notification +// is sent to all connected frontends detailing the block information +// about the now confirmed transaction. +var NotifyMinedTx = make(chan *tx.RecvTx) + +// NotifyMinedTxSender reads received transactions from in, notifying +// frontends that the tx has now been confirmed in a block. Duplicates +// are filtered out. +func NotifyMinedTxSender(in chan *tx.RecvTx) { + // Create a map to hold a set of already notified + // txids. Do not send duplicates. + m := make(map[btcwire.ShaHash]struct{}) + + for recv := range in { + if _, ok := m[recv.TxID]; !ok { + ntfn := btcws.NewTxMinedNtfn(recv.TxID.String(), + recv.BlockHash.String(), recv.BlockHeight, + recv.BlockTime, int(recv.BlockIndex)) + mntfn, _ := ntfn.MarshalJSON() + frontendNotificationMaster <- mntfn + + // Mark as sent. + m[recv.TxID] = struct{}{} + } + } +} + +// NotifyBalanceSyncerChans holds channels for accessing +// the NotifyBalanceSyncer goroutine. +var NotifyBalanceSyncerChans = struct { + add chan NotifyBalanceWorker + remove chan btcwire.ShaHash + access chan NotifyBalanceRequest +}{ + add: make(chan NotifyBalanceWorker), + remove: make(chan btcwire.ShaHash), + access: make(chan NotifyBalanceRequest), +} + +// NotifyBalanceWorker holds a block hash to add a worker to +// NotifyBalanceSyncer and uses a chan to returns the WaitGroup +// which should be decremented with Done after the worker is finished. +type NotifyBalanceWorker struct { + block btcwire.ShaHash + wg chan *sync.WaitGroup +} + +// NotifyBalanceRequest is used by the blockconnected notification handler +// to access and wait on the the WaitGroup for workers currently processing +// transactions for a block. If no handlers have been added, a nil +// WaitGroup is returned. +type NotifyBalanceRequest struct { + block btcwire.ShaHash + wg chan *sync.WaitGroup +} + +// NotifyBalanceSyncer maintains a map of block hashes to WaitGroups +// for worker goroutines that must finish before it is safe to notify +// frontends of a new balance in the blockconnected notification handler. +func NotifyBalanceSyncer(add chan NotifyBalanceWorker, + remove chan btcwire.ShaHash, + access chan NotifyBalanceRequest) { + + m := make(map[btcwire.ShaHash]*sync.WaitGroup) + + for { + select { + case worker := <-add: + wg, ok := m[worker.block] + if !ok { + wg = &sync.WaitGroup{} + m[worker.block] = wg + } + wg.Add(1) + m[worker.block] = wg + worker.wg <- wg + + case block := <-remove: + if _, ok := m[block]; ok { + delete(m, block) + } + + case req := <-access: + req.wg <- m[req.block] + } + } +} diff --git a/createtx.go b/createtx.go index 0efc849..abf1012 100644 --- a/createtx.go +++ b/createtx.go @@ -65,6 +65,7 @@ var TxFeeIncrement = struct { // for change (if any). type CreatedTx struct { rawTx []byte + txid btcwire.ShaHash time time.Time inputs []*tx.Utxo outputs []tx.Pair @@ -326,6 +327,7 @@ func (a *Account) txToPairs(pairs map[string]int64, minconf int) (*CreatedTx, er out := tx.Pair{ Amount: int64(change), PubkeyHash: changeAddrHash, + Change: true, } outputs = append(outputs, out) @@ -357,10 +359,16 @@ func (a *Account) txToPairs(pairs map[string]int64, minconf int) (*CreatedTx, er } } + txid, err := msgtx.TxSha() + if err != nil { + return nil, fmt.Errorf("cannot create txid for created tx: %v", err) + } + buf := new(bytes.Buffer) msgtx.BtcEncode(buf, btcwire.ProtocolVersion) info := &CreatedTx{ rawTx: buf.Bytes(), + txid: txid, time: time.Now(), inputs: selectedInputs, outputs: outputs, diff --git a/sockets.go b/sockets.go index 582ab78..82ddb6f 100644 --- a/sockets.go +++ b/sockets.go @@ -639,8 +639,16 @@ func NtfnBlockConnected(n btcjson.Cmd, marshaled []byte) { // btcd notifies btcwallet about transactions first, and then sends // the new block notification. New balance notifications for txs // in blocks are therefore sent here after all tx notifications - // have arrived. - + // have arrived and finished being processed by the handlers. + workers := NotifyBalanceRequest{ + block: *hash, + wg: make(chan *sync.WaitGroup), + } + NotifyBalanceSyncerChans.access <- workers + if wg := <-workers.wg; wg != nil { + wg.Wait() + NotifyBalanceSyncerChans.remove <- *hash + } accountstore.BlockNotify(bs) // Pass notification to frontends too. diff --git a/tx/tx.go b/tx/tx.go index 1e739ef..9b58aaa 100644 --- a/tx/tx.go +++ b/tx/tx.go @@ -43,10 +43,32 @@ const ( sendTxHeader ) -// File format versions. -const ( - utxoFileVersion uint32 = 0 - txFileVersion uint32 = 0 +// ReaderFromVersion is an io.ReaderFrom and io.WriterTo that +// can specify any particular wallet file format for reading +// depending on the wallet file version. +type ReaderFromVersion interface { + ReadFromVersion(uint32, io.Reader) (int64, error) + io.WriterTo +} + +// Various versions. +var ( + // First file version used. + utxoVersFirst uint32 = 0 + txVersFirst uint32 = 0 + + // txVersRecvTxIndex is the version where the txout index + // was added to the RecvTx struct. + txVersRecvTxIndex uint32 = 1 + + // txVersMarkSentChange is the version where serialized SentTx + // added a flags field, used for marking a sent transaction + // as change. + txVersMarkSentChange uint32 = 2 + + // Current versions + utxoVersCurrent = utxoVersFirst + txVersCurrent = txVersRecvTxIndex ) // UtxoStore is a type used for holding all Utxo structures for all @@ -159,6 +181,7 @@ func (p *pubkeyHash) WriteTo(w io.Writer) (int64, error) { // received by an address in a wallet. type RecvTx struct { TxID btcwire.ShaHash + TxOutIdx uint32 TimeReceived int64 BlockHeight int32 BlockHash btcwire.ShaHash @@ -178,8 +201,7 @@ var pairsVar = Pairs([]Pair{}) var _ io.ReaderFrom = &pairsVar var _ io.WriterTo = &pairsVar -// ReadFrom reades a Pair slice from r. Part of the io.ReaderFrom interface. -func (p *Pairs) ReadFrom(r io.Reader) (int64, error) { +func (p *Pairs) ReadFromVersion(vers uint32, r io.Reader) (int64, error) { var read int64 nPairsBytes := make([]byte, 4) // Raw bytes for a uint32. @@ -192,7 +214,7 @@ func (p *Pairs) ReadFrom(r io.Reader) (int64, error) { s := make([]Pair, nPairs) for i := range s { - n, err := s[i].ReadFrom(r) + n, err := s[i].ReadFromVersion(vers, r) if err != nil { return read + n, err } @@ -203,6 +225,10 @@ func (p *Pairs) ReadFrom(r io.Reader) (int64, error) { return read, nil } +func (p *Pairs) ReadFrom(r io.Reader) (int64, error) { + return p.ReadFromVersion(txVersCurrent, r) +} + // WriteTo writes a Pair slice to w. Part of the io.WriterTo interface. func (p *Pairs) WriteTo(w io.Writer) (int64, error) { var written int64 @@ -234,6 +260,7 @@ func (p *Pairs) WriteTo(w io.Writer) (int64, error) { type Pair struct { PubkeyHash pubkeyHash Amount int64 // Measured in Satoshis + Change bool } // Enforce that Pair satisifies the io.ReaderFrom and io.WriterTo @@ -241,6 +268,32 @@ type Pair struct { var _ io.ReaderFrom = &Pair{} var _ io.WriterTo = &Pair{} +func (p *Pair) ReadFromVersion(vers uint32, r io.Reader) (int64, error) { + if vers >= txVersMarkSentChange { + // Use latest version + return p.ReadFrom(r) + } + + // Old version did not read flags. + var read int64 + + n, err := p.PubkeyHash.ReadFrom(r) + if err != nil { + return n, err + } + read += n + + amountBytes := make([]byte, 8) // raw bytes for a uint64 + nr, err := r.Read(amountBytes) + if err != nil { + return read + int64(nr), err + } + read += int64(nr) + p.Amount = int64(binary.LittleEndian.Uint64(amountBytes)) + + return read, nil +} + // ReadFrom reads a serialized Pair from r. Part of the io.ReaderFrom // interface. func (p *Pair) ReadFrom(r io.Reader) (int64, error) { @@ -260,6 +313,15 @@ func (p *Pair) ReadFrom(r io.Reader) (int64, error) { read += int64(nr) p.Amount = int64(binary.LittleEndian.Uint64(amountBytes)) + // Read flags. + flags := make([]byte, 1) // raw bytes for 1 byte of flags + nr, err = r.Read(flags) + if err != nil { + return read + int64(nr), err + } + read += int64(nr) + p.Change = flags[0]&1<<0 == 1<<0 + return read, nil } @@ -363,7 +425,7 @@ func (u *UtxoStore) WriteTo(w io.Writer) (int64, error) { // Write file version. This is currently not used. versionBytes := make([]byte, 4) // bytes for a uint32 - binary.LittleEndian.PutUint32(versionBytes, utxoFileVersion) + binary.LittleEndian.PutUint32(versionBytes, utxoVersCurrent) n, err := w.Write(versionBytes) if err != nil { return int64(n), err @@ -383,6 +445,30 @@ func (u *UtxoStore) WriteTo(w io.Writer) (int64, error) { return written, nil } +// Insert inserts an Utxo into the store. +func (u *UtxoStore) Insert(utxo *Utxo) { + s := *u + defer func() { + *u = s + }() + + // First, iterate through all stored utxos. If an unconfirmed utxo + // (not present in a block) has the same outpoint as this utxo, + // update the block height and hash. + for i := range s { + if bytes.Equal(s[i].Out.Hash[:], utxo.Out.Hash[:]) && s[i].Out.Index == utxo.Out.Index { + // Fill relevant block information. + copy(s[i].BlockHash[:], utxo.BlockHash[:]) + s[i].Height = utxo.Height + return + } + } + + // After iterating through all UTXOs, it was not a duplicate or + // change UTXO appearing in a block. Append a new Utxo to the end. + s = append(s, utxo) +} + // Rollback removes all utxos from and after the block specified // by a block height and hash. // @@ -619,6 +705,7 @@ func (txs *TxStore) ReadFrom(r io.Reader) (int64, error) { if err != nil { return int64(n), err } + vers := binary.LittleEndian.Uint32(versionBytes) read += int64(n) store := []interface{}{} @@ -639,24 +726,30 @@ func (txs *TxStore) ReadFrom(r io.Reader) (int64, error) { read += n var tx io.ReaderFrom + // Read tx. switch header { case recvTxHeader: - tx = new(RecvTx) + t := new(RecvTx) + n, err = t.ReadFromVersion(vers, r) + if err != nil { + return read + n, err + } + read += n + tx = t case sendTxHeader: - tx = new(SendTx) + t := new(SendTx) + n, err = t.ReadFromVersion(vers, r) + if err != nil { + return read + n, err + } + read += n + tx = t default: return n, fmt.Errorf("unknown Tx header") } - // Read tx - n, err = tx.ReadFrom(r) - if err != nil { - return read + n, err - } - read += n - store = append(store, tx) } } @@ -671,7 +764,7 @@ func (txs *TxStore) WriteTo(w io.Writer) (int64, error) { // Write file version. This is currently not used. versionBytes := make([]byte, 4) // bytes for a uint32 - binary.LittleEndian.PutUint32(versionBytes, utxoFileVersion) + binary.LittleEndian.PutUint32(versionBytes, utxoVersCurrent) n, err := w.Write(versionBytes) if err != nil { return int64(n), err @@ -708,6 +801,41 @@ func (txs *TxStore) WriteTo(w io.Writer) (int64, error) { return written, nil } +// InsertRecvTx inserts a RecvTx, checking for duplicates, and updating +// previous entries with the latest block information in tx. +func (txs *TxStore) InsertRecvTx(tx *RecvTx) { + s := *txs + defer func() { + *txs = s + }() + + // First, iterate through all stored tx history. If a received tx + // matches the one being added (equal txid and txout idx), update + // it with the new block information. + for i := range s { + recvTx, ok := s[i].(*RecvTx) + if !ok { + // Can only check for equality if the types match. + continue + } + + // Found an identical received tx. + if bytes.Equal(recvTx.TxID[:], tx.TxID[:]) && + recvTx.TxOutIdx == tx.TxOutIdx { + + // Fill relevant block information. + copy(recvTx.BlockHash[:], tx.BlockHash[:]) + recvTx.BlockHeight = tx.BlockHeight + recvTx.BlockIndex = tx.BlockIndex + recvTx.BlockTime = tx.BlockTime + return + } + } + + // No received tx entries with the same outpoint. Append to the end. + s = append(s, tx) +} + // Rollback removes all txs from and after the block specified by a // block height and hash. // @@ -755,10 +883,47 @@ func (txs *TxStore) Rollback(height int32, hash *btcwire.ShaHash) (modified bool return } +func (tx *RecvTx) ReadFromVersion(vers uint32, r io.Reader) (n int64, err error) { + if vers >= txVersCurrent { + // Use current version. + return tx.ReadFrom(r) + } + + // Old file version did not save the txout index. + + datas := []interface{}{ + &tx.TxID, + // tx index not read. + &tx.TimeReceived, + &tx.BlockHeight, + &tx.BlockHash, + &tx.BlockIndex, + &tx.BlockTime, + &tx.Amount, + &tx.ReceiverHash, + } + var read int64 + for _, data := range datas { + switch e := data.(type) { + case io.ReaderFrom: + read, err = e.ReadFrom(r) + default: + read, err = binaryRead(r, binary.LittleEndian, data) + } + + if err != nil { + return n + read, err + } + n += read + } + return n, nil +} + // ReadFrom satisifies the io.ReaderFrom interface. A RecTx is read // in from r with the format: // // TxID (32 bytes) +// TxOutIdx (4 bytes, little endian) // TimeReceived (8 bytes, little endian) // BlockHeight (4 bytes, little endian) // BlockHash (32 bytes) @@ -769,6 +934,7 @@ func (txs *TxStore) Rollback(height int32, hash *btcwire.ShaHash) (modified bool func (tx *RecvTx) ReadFrom(r io.Reader) (n int64, err error) { datas := []interface{}{ &tx.TxID, + &tx.TxOutIdx, &tx.TimeReceived, &tx.BlockHeight, &tx.BlockHash, @@ -857,23 +1023,14 @@ func (tx *RecvTx) TxInfo(account string, curheight int32, txInfo["blockindex"] = tx.BlockIndex txInfo["blocktime"] = tx.BlockTime txInfo["confirmations"] = curheight - tx.BlockHeight + 1 + } else { + txInfo["confirmations"] = 0 } return txInfo } -// ReadFrom satisifies the io.WriterTo interface. A SendTx is read -// from r with the format: -// -// TxID (32 bytes) -// Time (8 bytes, little endian) -// BlockHeight (4 bytes, little endian) -// BlockHash (32 bytes) -// BlockIndex (4 bytes, little endian) -// BlockTime (8 bytes, little endian) -// Fee (8 bytes, little endian) -// Receivers (varies) -func (tx *SendTx) ReadFrom(r io.Reader) (n int64, err error) { +func (tx *SendTx) ReadFromVersion(vers uint32, r io.Reader) (n int64, err error) { var read int64 datas := []interface{}{ @@ -903,6 +1060,21 @@ func (tx *SendTx) ReadFrom(r io.Reader) (n int64, err error) { return n, nil } +// ReadFrom satisifies the io.WriterTo interface. A SendTx is read +// from r with the format: +// +// TxID (32 bytes) +// Time (8 bytes, little endian) +// BlockHeight (4 bytes, little endian) +// BlockHash (32 bytes) +// BlockIndex (4 bytes, little endian) +// BlockTime (8 bytes, little endian) +// Fee (8 bytes, little endian) +// Receivers (varies) +func (tx *SendTx) ReadFrom(r io.Reader) (n int64, err error) { + return tx.ReadFromVersion(txVersCurrent, r) +} + // WriteTo satisifies the io.WriterTo interface. A SendTx is written to // w in the format: //