diff --git a/peer/log.go b/peer/log.go index 755673f2..7469fd86 100644 --- a/peer/log.go +++ b/peer/log.go @@ -91,8 +91,12 @@ func invSummary(invList []*wire.InvVect) string { switch iv.Type { case wire.InvTypeError: return fmt.Sprintf("error %s", iv.Hash) + case wire.InvTypeWitnessBlock: + return fmt.Sprintf("witness block %s", iv.Hash) case wire.InvTypeBlock: return fmt.Sprintf("block %s", iv.Hash) + case wire.InvTypeWitnessTx: + return fmt.Sprintf("witness tx %s", iv.Hash) case wire.InvTypeTx: return fmt.Sprintf("tx %s", iv.Hash) } diff --git a/peer/peer.go b/peer/peer.go index 946dc793..0d4a932a 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -297,6 +297,7 @@ func newNetAddress(addr net.Addr, services wire.ServiceFlag) (*wire.NetAddress, type outMsg struct { msg wire.Message doneChan chan<- struct{} + encoding wire.MessageEncoding } // stallControlCmd represents the command of a stall control message. @@ -412,6 +413,8 @@ type Peer struct { sendHeadersPreferred bool // peer sent a sendheaders message verAckReceived bool + wireEncoding wire.MessageEncoding + knownInventory *mruInventoryMap prevGetBlocksMtx sync.Mutex prevGetBlocksBegin *chainhash.Hash @@ -1016,14 +1019,14 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error { wire.MultipleAddressVersion) rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectObsolete, reason) - return p.writeMessage(rejectMsg) + return p.writeMessage(rejectMsg, wire.LatestEncoding) } - // Updating a bunch of stats. + // Updating a bunch of stats including block based stats, and the + // peer's time offset. p.statsMtx.Lock() p.lastBlock = msg.LastBlock p.startingHeight = msg.LastBlock - // Set the peer's time offset. p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix() p.statsMtx.Unlock() @@ -1034,14 +1037,27 @@ func (p *Peer) handleRemoteVersionMsg(msg *wire.MsgVersion) error { p.versionKnown = true log.Debugf("Negotiated protocol version %d for peer %s", p.protocolVersion, p) + // Set the peer's ID. p.id = atomic.AddInt32(&nodeCount, 1) + // Set the supported services for the peer to what the remote peer // advertised. p.services = msg.Services + // Set the remote peer's user agent. p.userAgent = msg.UserAgent p.flagsMtx.Unlock() + + // Once the version message has been exchanged, we're able to determine + // if this peer knows how to encode witness data over the wire + // protocol. If so, then we'll switch to a decoding mode which is + // prepared for the new transaction format introduced as part of + // BIP0144. + if p.services&wire.SFNodeWitness == wire.SFNodeWitness { + p.wireEncoding = wire.WitnessEncoding + } + return nil } @@ -1081,9 +1097,9 @@ func (p *Peer) handlePongMsg(msg *wire.MsgPong) { } // readMessage reads the next bitcoin message from the peer with logging. -func (p *Peer) readMessage() (wire.Message, []byte, error) { - n, msg, buf, err := wire.ReadMessageN(p.conn, p.ProtocolVersion(), - p.cfg.ChainParams.Net) +func (p *Peer) readMessage(encoding wire.MessageEncoding) (wire.Message, []byte, error) { + n, msg, buf, err := wire.ReadMessageWithEncodingN(p.conn, + p.ProtocolVersion(), p.cfg.ChainParams.Net, encoding) atomic.AddUint64(&p.bytesReceived, uint64(n)) if p.cfg.Listeners.OnRead != nil { p.cfg.Listeners.OnRead(p, n, msg, err) @@ -1114,7 +1130,7 @@ func (p *Peer) readMessage() (wire.Message, []byte, error) { } // writeMessage sends a bitcoin message to the peer with logging. -func (p *Peer) writeMessage(msg wire.Message) error { +func (p *Peer) writeMessage(msg wire.Message, enc wire.MessageEncoding) error { // Don't do anything if we're disconnecting. if atomic.LoadInt32(&p.disconnect) != 0 { return nil @@ -1136,8 +1152,8 @@ func (p *Peer) writeMessage(msg wire.Message) error { })) log.Tracef("%v", newLogClosure(func() string { var buf bytes.Buffer - err := wire.WriteMessage(&buf, msg, p.ProtocolVersion(), - p.cfg.ChainParams.Net) + _, err := wire.WriteMessageWithEncodingN(&buf, msg, p.ProtocolVersion(), + p.cfg.ChainParams.Net, enc) if err != nil { return err.Error() } @@ -1145,8 +1161,8 @@ func (p *Peer) writeMessage(msg wire.Message) error { })) // Write the message to the peer. - n, err := wire.WriteMessageN(p.conn, msg, p.ProtocolVersion(), - p.cfg.ChainParams.Net) + n, err := wire.WriteMessageWithEncodingN(p.conn, msg, + p.ProtocolVersion(), p.cfg.ChainParams.Net, enc) atomic.AddUint64(&p.bytesSent, uint64(n)) if p.cfg.Listeners.OnWrite != nil { p.cfg.Listeners.OnWrite(p, n, msg, err) @@ -1408,7 +1424,7 @@ out: // Read a message and stop the idle timer as soon as the read // is done. The timer is reset below for the next iteration if // needed. - rmsg, buf, err := p.readMessage() + rmsg, buf, err := p.readMessage(p.wireEncoding) idleTimer.Stop() if err != nil { // In order to allow regression tests with malformed messages, don't @@ -1768,7 +1784,9 @@ out: } p.stallControl <- stallControlMsg{sccSendMessage, msg.msg} - if err := p.writeMessage(msg.msg); err != nil { + + err := p.writeMessage(msg.msg, msg.encoding) + if err != nil { p.Disconnect() if p.shouldLogWriteError(err) { log.Errorf("Failed to send message to "+ @@ -1844,6 +1862,18 @@ out: // // This function is safe for concurrent access. func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) { + p.QueueMessageWithEncoding(msg, doneChan, wire.BaseEncoding) +} + +// QueueMessageWithEncoding adds the passed bitcoin message to the peer send +// queue. This function is identical to QueueMessage, however it allows the +// caller to specify the wire encoding type that should be used when +// encoding/decoding blocks and transactions. +// +// This function is safe for concurrent access. +func (p *Peer) QueueMessageWithEncoding(msg wire.Message, doneChan chan<- struct{}, + encoding wire.MessageEncoding) { + // Avoid risk of deadlock if goroutine already exited. The goroutine // we will be sending to hangs around until it knows for a fact that // it is marked as disconnected and *then* it drains the channels. @@ -1855,7 +1885,7 @@ func (p *Peer) QueueMessage(msg wire.Message, doneChan chan<- struct{}) { } return } - p.outputQueue <- outMsg{msg: msg, doneChan: doneChan} + p.outputQueue <- outMsg{msg: msg, encoding: encoding, doneChan: doneChan} } // QueueInventory adds the passed inventory to the inventory send queue which @@ -1987,7 +2017,7 @@ func (p *Peer) WaitForDisconnect() { // acceptable then return an error. func (p *Peer) readRemoteVersionMsg() error { // Read their version message. - msg, _, err := p.readMessage() + msg, _, err := p.readMessage(wire.LatestEncoding) if err != nil { return err } @@ -1999,7 +2029,7 @@ func (p *Peer) readRemoteVersionMsg() error { rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed, errStr) - return p.writeMessage(rejectMsg) + return p.writeMessage(rejectMsg, wire.LatestEncoding) } if err := p.handleRemoteVersionMsg(remoteVerMsg); err != nil { @@ -2019,7 +2049,7 @@ func (p *Peer) writeLocalVersionMsg() error { return err } - return p.writeMessage(localVerMsg) + return p.writeMessage(localVerMsg, wire.LatestEncoding) } // negotiateInboundProtocol waits to receive a version message from the peer @@ -2062,6 +2092,7 @@ func newPeerBase(origCfg *Config, inbound bool) *Peer { p := Peer{ inbound: inbound, + wireEncoding: wire.BaseEncoding, knownInventory: newMruInventoryMap(maxKnownInventory), stallControl: make(chan stallControlMsg, 1), // nonblocking sync outputQueue: make(chan outMsg, outputBufferSize),