diff --git a/addrmanager.go b/addrmanager.go index 7f338104..f8bbcbb1 100644 --- a/addrmanager.go +++ b/addrmanager.go @@ -153,7 +153,7 @@ func (a *AddrManager) updateAddress(netAddr, srcAddr *btcwire.NetAddress) { // Enforce max addresses. if len(a.addrNew[bucket]) > newBucketSize { - log.Tracef("[AMGR] new bucket is full, expiring old ") + log.Tracef("AMGR: new bucket is full, expiring old ") a.expireNew(bucket) } @@ -161,7 +161,7 @@ func (a *AddrManager) updateAddress(netAddr, srcAddr *btcwire.NetAddress) { ka.refs++ a.addrNew[bucket][addr] = ka - log.Tracef("[AMGR] Added new address %s for a total of %d addresses", + log.Tracef("AMGR: Added new address %s for a total of %d addresses", addr, a.nTried+a.nNew) } @@ -259,7 +259,7 @@ func (a *AddrManager) expireNew(bucket int) { var oldest *knownAddress for k, v := range a.addrNew[bucket] { if bad(v) { - log.Tracef("[AMGR] expiring bad address %v", k) + log.Tracef("AMGR: expiring bad address %v", k) delete(a.addrNew[bucket], k) a.nNew-- v.refs-- @@ -277,7 +277,7 @@ func (a *AddrManager) expireNew(bucket int) { if oldest != nil { key := NetAddressKey(oldest.na) - log.Tracef("[AMGR] expiring oldest address %v", key) + log.Tracef("AMGR: expiring oldest address %v", key) delete(a.addrNew[bucket], key) a.nNew-- @@ -393,7 +393,7 @@ out: dumpAddressTicker.Stop() a.savePeers() a.wg.Done() - log.Trace("[AMGR] Address handler done") + log.Trace("AMGR: Address handler done") } type serialisedKnownAddress struct { @@ -484,7 +484,7 @@ func (a *AddrManager) loadPeers() { err := a.deserialisePeers(filePath) if err != nil { - log.Errorf("[AMGR] Failed to parse %s: %v", filePath, + log.Errorf("AMGR: Failed to parse %s: %v", filePath, err) // if it is invalid we nuke the old one unconditionally. err = os.Remove(filePath) @@ -495,7 +495,7 @@ func (a *AddrManager) loadPeers() { a.reset() return } - log.Infof("[AMGR] Loaded %d addresses from '%s'", a.nNew+a.nTried, + log.Infof("AMGR: Loaded %d addresses from '%s'", a.nNew+a.nTried, filePath) } @@ -610,7 +610,7 @@ func (a *AddrManager) Start() { return } - log.Trace("[AMGR] Starting address manager") + log.Trace("AMGR: Starting address manager") a.wg.Add(1) @@ -624,12 +624,12 @@ func (a *AddrManager) Start() { // Stop gracefully shuts down the address manager by stopping the main handler. func (a *AddrManager) Stop() error { if atomic.AddInt32(&a.shutdown, 1) != 1 { - log.Warnf("[AMGR] Address manager is already in the process of " + + log.Warnf("AMGR: Address manager is already in the process of " + "shutting down") return nil } - log.Infof("[AMGR] Address manager shutting down") + log.Infof("AMGR: Address manager shutting down") close(a.quit) a.wg.Wait() return nil @@ -659,7 +659,7 @@ func (a *AddrManager) AddAddressByIP(addrIP string) { // Split IP and port addr, portStr, err := net.SplitHostPort(addrIP) if err != nil { - log.Warnf("[AMGR] AddADddressByIP given bullshit adddress"+ + log.Warnf("AMGR: AddADddressByIP given bullshit adddress"+ "(%s): %v", err) return } @@ -668,12 +668,12 @@ func (a *AddrManager) AddAddressByIP(addrIP string) { na.Timestamp = time.Now() na.IP = net.ParseIP(addr) if na.IP == nil { - log.Error("[AMGR] Invalid ip address:", addr) + log.Error("AMGR: Invalid ip address:", addr) return } port, err := strconv.ParseUint(portStr, 10, 0) if err != nil { - log.Error("[AMGR] Invalid port: ", portStr, err) + log.Error("AMGR: Invalid port: ", portStr, err) return } na.Port = uint16(port) @@ -808,7 +808,7 @@ func (a *AddrManager) GetAddress(class string, newBias int) *knownAddress { ka := e.Value.(*knownAddress) randval := a.rand.Intn(large) if float64(randval) < (factor * chance(ka) * float64(large)) { - log.Tracef("[AMGR] Selected %v from tried "+ + log.Tracef("AMGR: Selected %v from tried "+ "bucket", NetAddressKey(ka.na)) return ka } @@ -836,7 +836,7 @@ func (a *AddrManager) GetAddress(class string, newBias int) *knownAddress { } randval := a.rand.Intn(large) if float64(randval) < (factor * chance(ka) * float64(large)) { - log.Tracef("[AMGR] Selected %v from new bucket", + log.Tracef("AMGR: Selected %v from new bucket", NetAddressKey(ka.na)) return ka } @@ -971,7 +971,7 @@ func (a *AddrManager) Good(addr *btcwire.NetAddress) { a.nNew++ rmkey := NetAddressKey(rmka.na) - log.Tracef("[AMGR] replacing %s with %s in tried", rmkey, addrKey) + log.Tracef("AMGR: replacing %s with %s in tried", rmkey, addrKey) // We made sure there is space here just above. a.addrNew[newBucket][rmkey] = rmka diff --git a/blockmanager.go b/blockmanager.go index a201d6fa..a154868c 100644 --- a/blockmanager.go +++ b/blockmanager.go @@ -91,7 +91,7 @@ func (b *blockManager) startSync(peers *list.List) { // Find the height of the current known best block. _, height, err := b.server.db.NewestSha() if err != nil { - log.Errorf("[BMGR] %v", err) + log.Errorf("BMGR: %v", err) return } @@ -121,17 +121,17 @@ func (b *blockManager) startSync(peers *list.List) { if bestPeer != nil { locator, err := b.blockChain.LatestBlockLocator() if err != nil { - log.Errorf("[BMGR] Failed to get block locator for the "+ + log.Errorf("BMGR: Failed to get block locator for the "+ "latest block: %v", err) return } - log.Infof("[BMGR] Syncing to block height %d from peer %v", + log.Infof("BMGR: Syncing to block height %d from peer %v", bestPeer.lastBlock, bestPeer.addr) bestPeer.PushGetBlocksMsg(locator, &zeroHash) b.syncPeer = bestPeer } else { - log.Warnf("[BMGR] No sync peer candidates available") + log.Warnf("BMGR: No sync peer candidates available") } } @@ -172,7 +172,7 @@ func (b *blockManager) handleNewPeerMsg(peers *list.List, p *peer) { return } - log.Infof("[BMGR] New valid peer %s", p) + log.Infof("BMGR: New valid peer %s", p) // Ignore the peer if it's not a sync candidate. if !b.isSyncCandidate(p) { @@ -199,7 +199,7 @@ func (b *blockManager) handleDonePeerMsg(peers *list.List, p *peer) { } } - log.Infof("[BMGR] Lost peer %s", p) + log.Infof("BMGR: Lost peer %s", p) // Remove requested transactions from the global map so that they will // be fetched from elsewhere next time we get an inv. @@ -236,6 +236,10 @@ func (b *blockManager) logBlockHeight(numTx, height int64) { return } + // Truncated the duration to 10s of milliseconds. + durationMillis := int64(duration / time.Millisecond) + tDuration := 10 * time.Millisecond * time.Duration(durationMillis/10) + // Log information about new block height. blockStr := "blocks" if b.receivedLogBlocks == 1 { @@ -245,9 +249,9 @@ func (b *blockManager) logBlockHeight(numTx, height int64) { if b.receivedLogTx == 1 { txStr = "transaction" } - log.Infof("[BMGR] Processed %d %s (%d %s) in the last %s - Block "+ + log.Infof("BMGR: Processed %d %s (%d %s) in the last %s - Block "+ "height %d", b.receivedLogBlocks, blockStr, b.receivedLogTx, - txStr, duration, height) + txStr, tDuration, height) b.receivedLogBlocks = 0 b.receivedLogTx = 0 @@ -261,7 +265,7 @@ func (b *blockManager) handleTxMsg(tmsg *txMsg) { // If we didn't ask for this block then the peer is misbehaving. if _, ok := tmsg.peer.requestedTxns[txHash]; !ok { - log.Warnf("[BMGR] Got unrequested transaction %v from %s -- "+ + log.Warnf("BMGR: Got unrequested transaction %v from %s -- "+ "disconnecting", &txHash, tmsg.peer.addr) tmsg.peer.Disconnect() return @@ -306,7 +310,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // mode in this case so the chain code is actually fed the // duplicate blocks. if !cfg.RegressionTest { - log.Warnf("[BMGR] Got unrequested block %v from %s -- "+ + log.Warnf("BMGR: Got unrequested block %v from %s -- "+ "disconnecting", blockSha, bmsg.peer.addr) bmsg.peer.Disconnect() return @@ -332,9 +336,9 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // it as such. Otherwise, something really did go wrong, so log // it as an actual error. if _, ok := err.(btcchain.RuleError); ok { - log.Infof("[BMGR] Rejected block %v: %v", blockSha, err) + log.Infof("BMGR: Rejected block %v: %v", blockSha, err) } else { - log.Errorf("[BMGR] Failed to process block %v: %v", blockSha, err) + log.Errorf("BMGR: Failed to process block %v: %v", blockSha, err) } return } @@ -348,7 +352,7 @@ func (b *blockManager) handleBlockMsg(bmsg *blockMsg) { // Log info about the new block height. _, height, err := b.server.db.NewestSha() if err != nil { - log.Warnf("[BMGR] Failed to obtain latest sha - %v", err) + log.Warnf("BMGR: Failed to obtain latest sha - %v", err) return } b.logBlockHeight(int64(len(bmsg.block.MsgBlock().Transactions)), height) @@ -449,7 +453,7 @@ func (b *blockManager) handleInvMsg(imsg *invMsg) { orphanRoot := chain.GetOrphanRoot(&iv.Hash) locator, err := chain.LatestBlockLocator() if err != nil { - log.Errorf("[PEER] Failed to get block "+ + log.Errorf("PEER: Failed to get block "+ "locator for the latest block: "+ "%v", err) continue @@ -551,7 +555,7 @@ out: } } b.wg.Done() - log.Trace("[BMGR] Block handler done") + log.Trace("BMGR: Block handler done") } // handleNotifyMsg handles notifications from btcchain. It does things such @@ -566,7 +570,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { orphanRoot := b.blockChain.GetOrphanRoot(orphanHash) locator, err := b.blockChain.LatestBlockLocator() if err != nil { - log.Errorf("[BMGR] Failed to get block locator "+ + log.Errorf("BMGR: Failed to get block locator "+ "for the latest block: %v", err) break } @@ -588,7 +592,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { block, ok := notification.Data.(*btcutil.Block) if !ok { - log.Warnf("[BMGR] Chain accepted notification is not a block.") + log.Warnf("BMGR: Chain accepted notification is not a block.") break } @@ -604,7 +608,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { case btcchain.NTBlockConnected: block, ok := notification.Data.(*btcutil.Block) if !ok { - log.Warnf("[BMGR] Chain connected notification is not a block.") + log.Warnf("BMGR: Chain connected notification is not a block.") break } @@ -618,7 +622,7 @@ func (b *blockManager) handleNotifyMsg(notification *btcchain.Notification) { case btcchain.NTBlockDisconnected: block, ok := notification.Data.(*btcutil.Block) if !ok { - log.Warnf("[BMGR] Chain disconnected notification is not a block.") + log.Warnf("BMGR: Chain disconnected notification is not a block.") break } @@ -697,7 +701,7 @@ func (b *blockManager) Start() { return } - log.Trace("[BMGR] Starting block manager") + log.Trace("BMGR: Starting block manager") b.wg.Add(1) go b.blockHandler() } @@ -706,12 +710,12 @@ func (b *blockManager) Start() { // handlers and waiting for them to finish. func (b *blockManager) Stop() error { if atomic.AddInt32(&b.shutdown, 1) != 1 { - log.Warnf("[BMGR] Block manager is already in the process of " + + log.Warnf("BMGR: Block manager is already in the process of " + "shutting down") return nil } - log.Infof("[BMGR] Block manager shutting down") + log.Infof("BMGR: Block manager shutting down") close(b.quit) b.wg.Wait() return nil @@ -732,16 +736,16 @@ func newBlockManager(s *server) (*blockManager, error) { bm.blockChain = btcchain.New(s.db, s.btcnet, bm.handleNotifyMsg) bm.blockChain.DisableCheckpoints(cfg.DisableCheckpoints) if cfg.DisableCheckpoints { - log.Info("[BMGR] Checkpoints are disabled") + log.Info("BMGR: Checkpoints are disabled") } - log.Infof("[BMGR] Generating initial block node index. This may " + + log.Infof("BMGR: Generating initial block node index. This may " + "take a while...") err := bm.blockChain.GenerateInitialIndex() if err != nil { return nil, err } - log.Infof("[BMGR] Block index generation complete") + log.Infof("BMGR: Block index generation complete") return &bm, nil } @@ -757,7 +761,7 @@ func removeRegressionDB(dbPath string) error { // Remove the old regression test database if it already exists. fi, err := os.Stat(dbPath) if err == nil { - log.Infof("[BMGR] Removing regression test database from '%s'", dbPath) + log.Infof("BMGR: Removing regression test database from '%s'", dbPath) if fi.IsDir() { err := os.RemoveAll(dbPath) if err != nil { @@ -829,7 +833,7 @@ func loadBlockDB() (btcdb.Db, error) { // each run, so remove it now if it already exists. removeRegressionDB(dbPath) - log.Infof("[BMGR] Loading block database from '%s'", dbPath) + log.Infof("BMGR: Loading block database from '%s'", dbPath) db, err := btcdb.OpenDB(cfg.DbType, dbPath) if err != nil { // Return the error if it's not because the database doesn't @@ -865,11 +869,11 @@ func loadBlockDB() (btcdb.Db, error) { db.Close() return nil, err } - log.Infof("[BMGR] Inserted genesis block %v", + log.Infof("BMGR: Inserted genesis block %v", activeNetParams.genesisHash) height = 0 } - log.Infof("[BMGR] Block database loaded with block height %d", height) + log.Infof("BMGR: Block database loaded with block height %d", height) return db, nil } diff --git a/btcd.go b/btcd.go index fc56f0fc..7e412c13 100644 --- a/btcd.go +++ b/btcd.go @@ -44,7 +44,7 @@ func btcdMain() error { if cfg.Profile != "" { go func() { listenAddr := net.JoinHostPort("", cfg.Profile) - log.Infof("[BTCD] Profile server listening on %s", listenAddr) + log.Infof("BTCD: Profile server listening on %s", listenAddr) log.Errorf("%v", http.ListenAndServe(listenAddr, nil)) }() } diff --git a/log.go b/log.go index 9fdbe1fd..6a1c8d9f 100644 --- a/log.go +++ b/log.go @@ -9,8 +9,19 @@ import ( "github.com/conformal/btcchain" "github.com/conformal/btcdb" "github.com/conformal/btcscript" + "github.com/conformal/btcwire" "github.com/conformal/seelog" "os" + "time" +) + +const ( + // lockTimeThreshold is the number below which a lock time is + // interpreted to be a block number. Since an average of one block + // is generated per 10 minutes, this allows blocks for about 9,512 + // years. However, if the field is interpreted as a timestamp, given + // the lock time is a uint32, the max is sometime around 2106. + lockTimeThreshold uint32 = 5e8 // Tue Nov 5 00:53:20 1985 UTC ) var ( @@ -36,14 +47,16 @@ func newLogClosure(c func() string) logClosure { // newLogger creates a new seelog logger using the provided logging level and // log message prefix. func newLogger(level string, prefix string) seelog.LoggerInterface { + // + fmtstring := ` - + - + ` config := fmt.Sprintf(fmtstring, level, prefix) @@ -98,3 +111,117 @@ func directionString(inbound bool) string { } return "outbound" } + +// formatLockTime returns a transaction lock time as a human-readable string. +func formatLockTime(lockTime uint32) string { + // The lock time field of a transaction is either a block height at + // which the transaction is finalized or a timestamp depending on if the + // value is before the lockTimeThreshold. When it is under the + // threshold it is a block height. + if lockTime < lockTimeThreshold { + return fmt.Sprintf("height %d", lockTime) + } + + return time.Unix(int64(lockTime), 0).String() +} + +// invSummary returns an inventory messege as a human-readable string. +func invSummary(invList []*btcwire.InvVect) string { + // No inventory. + invLen := len(invList) + if invLen == 0 { + return "empty" + } + + // One inventory item. + if invLen == 1 { + iv := invList[0] + switch iv.Type { + case btcwire.InvTypeError: + return fmt.Sprintf("error %s", iv.Hash) + case btcwire.InvTypeBlock: + return fmt.Sprintf("block %s", iv.Hash) + case btcwire.InvTypeTx: + return fmt.Sprintf("tx %s", iv.Hash) + } + + return fmt.Sprintf("unknown (%d) %s", uint32(iv.Type), iv.Hash) + } + + // More than one inv item. + return fmt.Sprintf("size %d", invLen) +} + +// locatorSummary returns a block locator as a human-readable string. +func locatorSummary(locator []*btcwire.ShaHash, stopHash *btcwire.ShaHash) string { + if len(locator) > 0 { + return fmt.Sprintf("locator %s, stop %s", locator[0], stopHash) + } + + return fmt.Sprintf("no locator, stop %s", stopHash) + +} + +// messageSummary returns a human-readable string which summarizes a message. +// Not all messages have or need a summary. This is used for debug logging. +func messageSummary(msg btcwire.Message) string { + switch msg := msg.(type) { + case *btcwire.MsgVersion: + return fmt.Sprintf("agent %s, pver %d, block %d", + msg.UserAgent, msg.ProtocolVersion, msg.LastBlock) + + case *btcwire.MsgVerAck: + // No summary. + + case *btcwire.MsgGetAddr: + // No summary. + + case *btcwire.MsgAddr: + return fmt.Sprintf("%d addr", len(msg.AddrList)) + + case *btcwire.MsgPing: + // No summary - perhaps add nonce. + + case *btcwire.MsgPong: + // No summary - perhaps add nonce. + + case *btcwire.MsgAlert: + // No summary. + + case *btcwire.MsgMemPool: + // No summary. + + case *btcwire.MsgTx: + hash, _ := msg.TxSha() + return fmt.Sprintf("hash %s, %d inputs, %d outputs, lock %s", + hash, len(msg.TxIn), len(msg.TxOut), + formatLockTime(msg.LockTime)) + + case *btcwire.MsgBlock: + header := &msg.Header + hash, _ := msg.BlockSha() + return fmt.Sprintf("hash %s, ver %d, %d tx, %s", hash, + header.Version, header.TxnCount, header.Timestamp) + + case *btcwire.MsgInv: + return invSummary(msg.InvList) + + case *btcwire.MsgNotFound: + return invSummary(msg.InvList) + + case *btcwire.MsgGetData: + return invSummary(msg.InvList) + + case *btcwire.MsgGetBlocks: + return locatorSummary(msg.BlockLocatorHashes, &msg.HashStop) + + case *btcwire.MsgGetHeaders: + return locatorSummary(msg.BlockLocatorHashes, &msg.HashStop) + + case *btcwire.MsgHeaders: + return fmt.Sprintf("num %d", len(msg.Headers)) + } + + // No summary for other messages. + return "" +} diff --git a/mempool.go b/mempool.go index ddd710d0..73f49573 100644 --- a/mempool.go +++ b/mempool.go @@ -372,7 +372,7 @@ func (mp *txMemPool) addOrphan(tx *btcwire.MsgTx, txHash *btcwire.ShaHash) { mp.orphansByPrev[originTxHash].PushBack(tx) } - log.Debugf("[TXMP] Stored orphan transaction %v (total: %d)", txHash, + log.Debugf("TXMP: Stored orphan transaction %v (total: %d)", txHash, len(mp.orphans)) } @@ -663,7 +663,7 @@ func (mp *txMemPool) maybeAcceptTransaction(tx *btcwire.MsgTx, isOrphan *bool) e mp.addTransaction(tx, &txHash) mp.lock.RLock() - log.Debugf("[TXMP] Accepted transaction %v (pool size: %v)", txHash, + log.Debugf("TXMP: Accepted transaction %v (pool size: %v)", txHash, len(mp.pool)) mp.lock.RUnlock() @@ -742,7 +742,7 @@ func (mp *txMemPool) ProcessTransaction(tx *btcwire.MsgTx) error { if err != nil { return err } - log.Tracef("[TXMP] Processing transaction %v", txHash) + log.Tracef("TXMP: Processing transaction %v", txHash) // Potentially accept the transaction to the memory pool. var isOrphan bool diff --git a/peer.go b/peer.go index 861759f5..2bf820f9 100644 --- a/peer.go +++ b/peer.go @@ -210,7 +210,7 @@ func (p *peer) pushVersionMsg() error { func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Detect self connections. if msg.Nonce == p.server.nonce { - log.Debugf("[PEER] Disconnecting peer connected to self %s", + log.Debugf("PEER: Disconnecting peer connected to self %s", p.addr) p.Disconnect() return @@ -218,7 +218,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Limit to one version message per peer. if p.versionKnown { - p.logError("[PEER] Only one version message per peer is allowed %s.", + p.logError("PEER: Only one version message per peer is allowed %s.", p.addr) p.Disconnect() return @@ -227,7 +227,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Negotiate the protocol version. p.protocolVersion = minUint32(p.protocolVersion, uint32(msg.ProtocolVersion)) p.versionKnown = true - log.Debugf("[PEER] Negotiated protocol version %d for peer %s", + log.Debugf("PEER: Negotiated protocol version %d for peer %s", p.protocolVersion, p.addr) p.lastBlock = msg.LastBlock @@ -240,7 +240,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Send version. err := p.pushVersionMsg() if err != nil { - p.logError("[PEER] Can't send version message: %v", err) + p.logError("PEER: Can't send version message: %v", err) p.Disconnect() return } @@ -249,7 +249,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Set up a NetAddress for the peer to be used with AddrManager. na, err := newNetAddress(p.conn.RemoteAddr(), p.services) if err != nil { - p.logError("[PEER] Can't get remote address: %v", err) + p.logError("PEER: Can't get remote address: %v", err) p.Disconnect() return } @@ -266,7 +266,7 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Advertise the local address. na, err := newNetAddress(p.conn.LocalAddr(), p.services) if err != nil { - p.logError("[PEER] Can't advertise local "+ + p.logError("PEER: Can't advertise local "+ "address: %v", err) p.Disconnect() return @@ -328,7 +328,7 @@ func (p *peer) pushBlockMsg(sha btcwire.ShaHash) error { blk, err := p.server.db.FetchBlockBySha(&sha) if err != nil { - log.Tracef("[PEER] Unable to fetch requested block sha %v: %v", + log.Tracef("PEER: Unable to fetch requested block sha %v: %v", &sha, err) return err } @@ -371,7 +371,7 @@ func (p *peer) PushGetBlocksMsg(locator btcchain.BlockLocator, stopHash *btcwire beginHash != nil && stopHash.IsEqual(p.prevGetBlocksStop) && beginHash.IsEqual(p.prevGetBlocksBegin) { - log.Tracef("[PEER] Filtering duplicate [getblocks] with begin "+ + log.Tracef("PEER: Filtering duplicate [getblocks] with begin "+ "hash %v, stop hash %v", beginHash, stopHash) return nil } @@ -493,7 +493,7 @@ out: case btcwire.InvTypeBlock: err = p.pushBlockMsg(iv.Hash) default: - log.Warnf("[PEER] Unknown type in inventory request %d", + log.Warnf("PEER: Unknown type in inventory request %d", iv.Type) break out } @@ -552,7 +552,7 @@ func (p *peer) handleGetBlocksMsg(msg *btcwire.MsgGetBlocks) { // Fetch the inventory from the block database. hashList, err := p.server.db.FetchHeightRange(start, endIdx) if err != nil { - log.Warnf("[PEER] Block lookup failed: %v", err) + log.Warnf("PEER: Block lookup failed: %v", err) return } @@ -646,7 +646,7 @@ func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) { // Fetch the inventory from the block database. hashList, err := p.server.db.FetchHeightRange(start, endIdx) if err != nil { - log.Warnf("[PEER] Header lookup failed: %v", err) + log.Warnf("PEER: Header lookup failed: %v", err) return } @@ -660,7 +660,7 @@ func (p *peer) handleGetHeadersMsg(msg *btcwire.MsgGetHeaders) { for _, hash := range hashList { block, err := p.server.db.FetchBlockBySha(&hash) if err != nil { - log.Warnf("[PEER] Lookup of known block hash "+ + log.Warnf("PEER: Lookup of known block hash "+ "failed: %v", err) continue } @@ -686,7 +686,7 @@ func (p *peer) handleGetAddrMsg(msg *btcwire.MsgGetAddr) { // Push the addresses. err := p.pushAddrMsg(addrCache) if err != nil { - p.logError("[PEER] Can't push address message: %v", err) + p.logError("PEER: Can't push address message: %v", err) p.Disconnect() return } @@ -739,7 +739,7 @@ func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) { // A message that has no addresses is invalid. if len(msg.AddrList) == 0 { - p.logError("[PEER] Command [%s] from %s does not contain any addresses", + p.logError("PEER: Command [%s] from %s does not contain any addresses", msg.Command(), p.addr) p.Disconnect() return @@ -789,16 +789,23 @@ func (p *peer) readMessage() (msg btcwire.Message, buf []byte, err error) { if err != nil { return } - log.Debugf("[PEER] Received command [%v] from %s", msg.Command(), - p.addr) // Use closures to log expensive operations so they are only run when // the logging level requires it. - log.Tracef("%v", newLogClosure(func() string { - return "[PEER] " + spew.Sdump(msg) + log.Debugf("%v", newLogClosure(func() string { + // Debug summary of message. + summary := messageSummary(msg) + if len(summary) > 0 { + summary = " (" + summary + ")" + } + return fmt.Sprintf("PEER: Received %v%s from %s", + msg.Command(), summary, p.addr) })) log.Tracef("%v", newLogClosure(func() string { - return "[PEER] " + spew.Sdump(buf) + return "PEER: " + spew.Sdump(msg) + })) + log.Tracef("%v", newLogClosure(func() string { + return "PEER: " + spew.Sdump(buf) })) return @@ -811,13 +818,19 @@ func (p *peer) writeMessage(msg btcwire.Message) { return } - log.Debugf("[PEER] Sending command [%v] to %s", msg.Command(), - p.addr) - - // Use closures to log expensive operations so they are only run when the - // logging level requires it. + // Use closures to log expensive operations so they are only run when + // the logging level requires it. + log.Debugf("%v", newLogClosure(func() string { + // Debug summary of message. + summary := messageSummary(msg) + if len(summary) > 0 { + summary = " (" + summary + ")" + } + return fmt.Sprintf("PEER: Sending %v%s to %s", msg.Command(), + summary, p.addr) + })) log.Tracef("%v", newLogClosure(func() string { - return "[PEER] " + spew.Sdump(msg) + return "PEER: " + spew.Sdump(msg) })) log.Tracef("%v", newLogClosure(func() string { var buf bytes.Buffer @@ -825,14 +838,14 @@ func (p *peer) writeMessage(msg btcwire.Message) { if err != nil { return err.Error() } - return "[PEER] " + spew.Sdump(buf.Bytes()) + return "PEER: " + spew.Sdump(buf.Bytes()) })) // Write the message to the peer. err := btcwire.WriteMessage(p.conn, msg, p.protocolVersion, p.btcnet) if err != nil { p.Disconnect() - p.logError("[PEER] Can't send message: %v", err) + p.logError("PEER: Can't send message: %v", err) return } } @@ -875,21 +888,21 @@ out: // regression test mode and the error is one of the // allowed errors. if cfg.RegressionTest && p.isAllowedByRegression(err) { - log.Errorf("[PEER] Allowed regression test "+ + log.Errorf("PEER: Allowed regression test "+ "error: %v", err) continue } // Only log the error if we're not forcibly disconnecting. if atomic.LoadInt32(&p.disconnect) == 0 { - p.logError("[PEER] Can't read message: %v", err) + p.logError("PEER: Can't read message: %v", err) } break out } // Ensure version message comes first. if _, ok := rmsg.(*btcwire.MsgVersion); !ok && !p.versionKnown { - p.logError("[PEER] A version message must precede all others") + p.logError("PEER: A version message must precede all others") break out } @@ -945,7 +958,7 @@ out: p.handleGetHeadersMsg(msg) default: - log.Debugf("[PEER] Received unhandled message of type %v: Fix Me", + log.Debugf("PEER: Received unhandled message of type %v: Fix Me", rmsg.Command()) } @@ -970,7 +983,7 @@ out: p.server.blockManager.DonePeer(p) } - log.Tracef("[PEER] Peer input handler done for %s", p.addr) + log.Tracef("PEER: Peer input handler done for %s", p.addr) } // outHandler handles all outgoing messages for the peer. It must be run as a @@ -1025,7 +1038,7 @@ out: break out } } - log.Tracef("[PEER] Peer output handler done for %s", p.addr) + log.Tracef("PEER: Peer output handler done for %s", p.addr) } // QueueMessage adds the passed bitcoin message to the peer send queue. It @@ -1076,13 +1089,13 @@ func (p *peer) Start() error { return nil } - log.Tracef("[PEER] Starting peer %s", p.addr) + log.Tracef("PEER: Starting peer %s", p.addr) // Send an initial version message if this is an outbound connection. if !p.inbound { err := p.pushVersionMsg() if err != nil { - p.logError("[PEER] Can't send outbound version "+ + p.logError("PEER: Can't send outbound version "+ "message %v", err) p.Disconnect() return err @@ -1098,7 +1111,7 @@ func (p *peer) Start() error { // Shutdown gracefully shuts down the peer by disconnecting it. func (p *peer) Shutdown() { - log.Tracef("[PEER] Shutdown peer %s", p.addr) + log.Tracef("PEER: Shutdown peer %s", p.addr) p.Disconnect() } @@ -1189,11 +1202,11 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer { // this is a persistent connection, retry after the retry // interval. for atomic.LoadInt32(&p.disconnect) == 0 { - log.Debugf("[SRVR] Attempting to connect to %s", faddr) + log.Debugf("SRVR: Attempting to connect to %s", faddr) conn, err := dial("tcp", addr) if err != nil { p.retrycount += 1 - log.Debugf("[SRVR] Failed to connect to %s: %v", + log.Debugf("SRVR: Failed to connect to %s: %v", faddr, err) if !persistent { p.server.donePeers <- p @@ -1201,7 +1214,7 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer { } scaledInterval := connectionRetryInterval.Nanoseconds() * p.retrycount / 2 scaledDuration := time.Duration(scaledInterval) - log.Debugf("[SRVR] Retrying connection to %s "+ + log.Debugf("SRVR: Retrying connection to %s "+ "in %s", faddr, scaledDuration) time.Sleep(scaledDuration) continue @@ -1214,7 +1227,7 @@ func newOutboundPeer(s *server, addr string, persistent bool) *peer { p.server.addrManager.Attempt(p.na) // Connection was successful so log it and start peer. - log.Debugf("[SRVR] Connected to %s", + log.Debugf("SRVR: Connected to %s", conn.RemoteAddr()) p.conn = conn atomic.AddInt32(&p.connected, 1) diff --git a/rpcserver.go b/rpcserver.go index b05dc17b..447cee2d 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -42,7 +42,7 @@ func (s *rpcServer) Start() { return } - log.Trace("[RPCS] Starting RPC server") + log.Trace("RPCS: Starting RPC server") http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { login := s.username + ":" + s.password auth := "Basic " + base64.StdEncoding.EncodeToString([]byte(login)) @@ -50,7 +50,7 @@ func (s *rpcServer) Start() { if len(authhdr) > 0 && authhdr[0] == auth { jsonRPCRead(w, r, s) } else { - log.Warnf("[RPCS] Auth failure.") + log.Warnf("RPCS: Auth failure.") jsonAuthFail(w, r, s) } }) @@ -58,9 +58,9 @@ func (s *rpcServer) Start() { for _, listener := range s.listeners { s.wg.Add(1) go func(listener net.Listener) { - log.Infof("[RPCS] RPC server listening on %s", listener.Addr()) + log.Infof("RPCS: RPC server listening on %s", listener.Addr()) httpServer.Serve(listener) - log.Tracef("[RPCS] RPC listener done for %s", listener.Addr()) + log.Tracef("RPCS: RPC listener done for %s", listener.Addr()) s.wg.Done() }(listener) } @@ -69,18 +69,18 @@ func (s *rpcServer) Start() { // Stop is used by server.go to stop the rpc listener. func (s *rpcServer) Stop() error { if atomic.AddInt32(&s.shutdown, 1) != 1 { - log.Infof("[RPCS] RPC server is already in the process of shutting down") + log.Infof("RPCS: RPC server is already in the process of shutting down") return nil } - log.Warnf("[RPCS] RPC server shutting down") + log.Warnf("RPCS: RPC server shutting down") for _, listener := range s.listeners { err := listener.Close() if err != nil { - log.Errorf("[RPCS] Problem shutting down rpc: %v", err) + log.Errorf("RPCS: Problem shutting down rpc: %v", err) return err } } - log.Infof("[RPCS] RPC server shutdown complete") + log.Infof("RPCS: RPC server shutdown complete") s.wg.Wait() return nil } @@ -100,7 +100,7 @@ func newRPCServer(s *server) (*rpcServer, error) { listenAddr4 := net.JoinHostPort("127.0.0.1", rpc.rpcport) listener4, err := net.Listen("tcp4", listenAddr4) if err != nil { - log.Errorf("[RPCS] Couldn't create listener: %v", err) + log.Errorf("RPCS: Couldn't create listener: %v", err) return nil, err } listeners = append(listeners, listener4) @@ -109,7 +109,7 @@ func newRPCServer(s *server) (*rpcServer, error) { listenAddr6 := net.JoinHostPort("::1", rpc.rpcport) listener6, err := net.Listen("tcp6", listenAddr6) if err != nil { - log.Errorf("[RPCS] Couldn't create listener: %v", err) + log.Errorf("RPCS: Couldn't create listener: %v", err) return nil, err } listeners = append(listeners, listener6) @@ -134,13 +134,13 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { var rawReply btcjson.Reply body, err := btcjson.GetRaw(r.Body) if err != nil { - log.Errorf("[RPCS] Error getting json message: %v", err) + log.Errorf("RPCS: Error getting json message: %v", err) return } var message btcjson.Message err = json.Unmarshal(body, &message) if err != nil { - log.Errorf("[RPCS] Error unmarshalling json message: %v", err) + log.Errorf("RPCS: Error unmarshalling json message: %v", err) jsonError := btcjson.Error{ Code: -32700, Message: "Parse error", @@ -151,7 +151,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { Error: &jsonError, Id: nil, } - log.Tracef("[RPCS] reply: %v", rawReply) + log.Tracef("RPCS: reply: %v", rawReply) msg, err := btcjson.MarshallAndSend(rawReply, w) if err != nil { log.Errorf(msg) @@ -161,7 +161,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { return } - log.Tracef("[RPCS] received: %v", message) + log.Tracef("RPCS: received: %v", message) // Deal with commands switch message.Method { @@ -225,7 +225,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { Error: &jsonError, Id: &message.Id, } - log.Tracef("[RPCS] reply: %v", rawReply) + log.Tracef("RPCS: reply: %v", rawReply) break } @@ -251,7 +251,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { } sha, err := btcwire.NewShaHashFromStr(hash) if err != nil { - log.Errorf("[RPCS] Error generating sha: %v", err) + log.Errorf("RPCS: Error generating sha: %v", err) jsonError := btcjson.Error{ Code: -5, Message: "Block not found", @@ -262,12 +262,12 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { Error: &jsonError, Id: &message.Id, } - log.Tracef("[RPCS] reply: %v", rawReply) + log.Tracef("RPCS: reply: %v", rawReply) break } blk, err := s.server.db.FetchBlockBySha(sha) if err != nil { - log.Errorf("[RPCS] Error fetching sha: %v", err) + log.Errorf("RPCS: Error fetching sha: %v", err) jsonError := btcjson.Error{ Code: -5, Message: "Block not found", @@ -278,13 +278,13 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { Error: &jsonError, Id: &message.Id, } - log.Tracef("[RPCS] reply: %v", rawReply) + log.Tracef("RPCS: reply: %v", rawReply) break } idx := blk.Height() buf, err := blk.Bytes() if err != nil { - log.Errorf("[RPCS] Error fetching block: %v", err) + log.Errorf("RPCS: Error fetching block: %v", err) jsonError := btcjson.Error{ Code: -5, Message: "Block not found", @@ -295,7 +295,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { Error: &jsonError, Id: &message.Id, } - log.Tracef("[RPCS] reply: %v", rawReply) + log.Tracef("RPCS: reply: %v", rawReply) break } @@ -308,7 +308,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { _, maxidx, err := s.server.db.NewestSha() if err != nil { - log.Errorf("[RPCS] Cannot get newest sha: %v", err) + log.Errorf("RPCS: Cannot get newest sha: %v", err) return } @@ -332,7 +332,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { if idx < maxidx { shaNext, err := s.server.db.FetchBlockShaByHeight(int64(idx + 1)) if err != nil { - log.Errorf("[RPCS] No next block: %v", err) + log.Errorf("RPCS: No next block: %v", err) } else { blockReply.NextHash = shaNext.String() } @@ -373,7 +373,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { var txS *btcwire.MsgTx txList, err := s.server.db.FetchTxBySha(txSha) if err != nil { - log.Errorf("[RPCS] Error fetching tx: %v", err) + log.Errorf("RPCS: Error fetching tx: %v", err) jsonError := btcjson.Error{ Code: -5, Message: "No information available about transaction", @@ -384,7 +384,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { Error: &jsonError, Id: &message.Id, } - log.Tracef("[RPCS] reply: %v", rawReply) + log.Tracef("RPCS: reply: %v", rawReply) break } @@ -393,7 +393,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { blksha := txList[lastTx].BlkSha blk, err := s.server.db.FetchBlockBySha(blksha) if err != nil { - log.Errorf("[RPCS] Error fetching sha: %v", err) + log.Errorf("RPCS: Error fetching sha: %v", err) jsonError := btcjson.Error{ Code: -5, Message: "Block not found", @@ -404,7 +404,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { Error: &jsonError, Id: &message.Id, } - log.Tracef("[RPCS] reply: %v", rawReply) + log.Tracef("RPCS: reply: %v", rawReply) break } idx := blk.Height() @@ -431,7 +431,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { voutList[i].ScriptPubKey.ReqSig = strings.Count(isbuf, "OP_CHECKSIG") _, addrhash, err := btcscript.ScriptToAddrHash(v.PkScript) if err != nil { - log.Errorf("[RPCS] Error getting address hash for %v: %v", txSha, err) + log.Errorf("RPCS: Error getting address hash for %v: %v", txSha, err) } if addr, err := btcutil.EncodeAddress(addrhash, s.server.btcnet); err != nil { addrList := make([]string, 1) @@ -442,7 +442,7 @@ func jsonRPCRead(w http.ResponseWriter, r *http.Request, s *rpcServer) { _, maxidx, err := s.server.db.NewestSha() if err != nil { - log.Errorf("[RPCS] Cannot get newest sha: %v", err) + log.Errorf("RPCS: Cannot get newest sha: %v", err) return } confirmations := uint64(1 + maxidx - idx) @@ -523,7 +523,7 @@ func getDifficultyRatio(bits uint32) float64 { outString := difficulty.FloatString(2) diff, err := strconv.ParseFloat(outString, 64) if err != nil { - log.Errorf("[RPCS] Cannot get difficulty: %v", err) + log.Errorf("RPCS: Cannot get difficulty: %v", err) return 0 } return diff diff --git a/server.go b/server.go index 073a77dc..dc3046f8 100644 --- a/server.go +++ b/server.go @@ -76,7 +76,7 @@ func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, // Ignore new peers if we're shutting down. if atomic.LoadInt32(&s.shutdown) != 0 { - log.Infof("[SRVR] New peer %s ignored - server is shutting "+ + log.Infof("SRVR: New peer %s ignored - server is shutting "+ "down", p) p.Shutdown() return false @@ -85,19 +85,19 @@ func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, // Disconnect banned peers. host, _, err := net.SplitHostPort(p.addr) if err != nil { - log.Debugf("[SRVR] can't split hostport %v", err) + log.Debugf("SRVR: can't split hostport %v", err) p.Shutdown() return false } if banEnd, ok := banned[host]; ok { if time.Now().Before(banEnd) { - log.Debugf("[SRVR] Peer %s is banned for another %v - "+ + log.Debugf("SRVR: Peer %s is banned for another %v - "+ "disconnecting", host, banEnd.Sub(time.Now())) p.Shutdown() return false } - log.Infof("[SRVR] Peer %s is no longer banned", host) + log.Infof("SRVR: Peer %s is no longer banned", host) delete(banned, host) } @@ -105,7 +105,7 @@ func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, // Limit max number of total peers. if peers.Len() >= cfg.MaxPeers { - log.Infof("[SRVR] Max peers reached [%d] - disconnecting "+ + log.Infof("SRVR: Max peers reached [%d] - disconnecting "+ "peer %s", cfg.MaxPeers, p) p.Shutdown() // TODO(oga) how to handle permanent peers here? @@ -114,7 +114,7 @@ func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, } // Add the new peer and start it. - log.Debugf("[SRVR] New peer %s", p) + log.Debugf("SRVR: New peer %s", p) peers.PushBack(p) if p.inbound { p.Start() @@ -136,11 +136,11 @@ func (s *server) handleDonePeerMsg(peers *list.List, p *peer) bool { return false } peers.Remove(e) - log.Debugf("[SRVR] Removed peer %s", p) + log.Debugf("SRVR: Removed peer %s", p) return true } } - log.Warnf("[SRVR] Lost peer %v that we never had!", p) + log.Warnf("SRVR: Lost peer %v that we never had!", p) return false } @@ -149,11 +149,11 @@ func (s *server) handleDonePeerMsg(peers *list.List, p *peer) bool { func (s *server) handleBanPeerMsg(banned map[string]time.Time, p *peer) { host, _, err := net.SplitHostPort(p.addr) if err != nil { - log.Debugf("[SRVR] can't split ban peer %s %v", p.addr, err) + log.Debugf("SRVR: can't split ban peer %s %v", p.addr, err) return } direction := directionString(p.inbound) - log.Infof("[SRVR] Banned peer %s (%s) for %v", host, direction, + log.Infof("SRVR: Banned peer %s (%s) for %v", host, direction, cfg.BanDuration) banned[host] = time.Now().Add(cfg.BanDuration) @@ -201,13 +201,13 @@ func (s *server) handleBroadcastMsg(peers *list.List, bmsg *broadcastMsg) { // listenHandler is the main listener which accepts incoming connections for the // server. It must be run as a goroutine. func (s *server) listenHandler(listener net.Listener) { - log.Infof("[SRVR] Server listening on %s", listener.Addr()) + log.Infof("SRVR: Server listening on %s", listener.Addr()) for atomic.LoadInt32(&s.shutdown) == 0 { conn, err := listener.Accept() if err != nil { // Only log the error if we're not forcibly shutting down. if atomic.LoadInt32(&s.shutdown) == 0 { - log.Errorf("[SRVR] can't accept connection: %v", + log.Errorf("SRVR: can't accept connection: %v", err) } continue @@ -215,7 +215,7 @@ func (s *server) listenHandler(listener net.Listener) { s.AddPeer(newInboundPeer(s, conn)) } s.wg.Done() - log.Tracef("[SRVR] Listener handler done for %s", listener.Addr()) + log.Tracef("SRVR: Listener handler done for %s", listener.Addr()) } // seedFromDNS uses DNS seeding to populate the address manager with peers. @@ -271,7 +271,7 @@ func (s *server) peerHandler() { s.addrManager.Start() s.blockManager.Start() - log.Tracef("[SRVR] Starting peer handler") + log.Tracef("SRVR: Starting peer handler") peers := list.New() bannedPeers := make(map[string]time.Time) outboundPeers := 0 @@ -428,7 +428,7 @@ out: s.blockManager.Stop() s.addrManager.Stop() s.wg.Done() - log.Tracef("[SRVR] Peer handler done") + log.Tracef("SRVR: Peer handler done") } // AddPeer adds a new peer that has already been connected to the server. @@ -463,7 +463,7 @@ func (s *server) Start() { return } - log.Trace("[SRVR] Starting server") + log.Trace("SRVR: Starting server") // Start all the listeners. There will not be any if listening is // disabled. @@ -488,11 +488,11 @@ func (s *server) Start() { func (s *server) Stop() error { // Make sure this only happens once. if atomic.AddInt32(&s.shutdown, 1) != 1 { - log.Infof("[SRVR] Server is already in the process of shutting down") + log.Infof("SRVR: Server is already in the process of shutting down") return nil } - log.Warnf("[SRVR] Server shutting down") + log.Warnf("SRVR: Server shutting down") // Stop all the listeners. There will not be any listeners if // listening is disabled. @@ -516,7 +516,7 @@ func (s *server) Stop() error { // WaitForShutdown blocks until the main listener and peer handlers are stopped. func (s *server) WaitForShutdown() { s.wg.Wait() - log.Infof("[SRVR] Server shutdown complete") + log.Infof("SRVR: Server shutdown complete") } // ScheduleShutdown schedules a server shutdown after the specified duration. @@ -527,7 +527,7 @@ func (s *server) ScheduleShutdown(duration time.Duration) { if atomic.AddInt32(&s.shutdownSched, 1) != 1 { return } - log.Warnf("[SRVR] Server shutdown in %v", duration) + log.Warnf("SRVR: Server shutdown in %v", duration) go func() { remaining := duration tickDuration := dynamicTickDuration(remaining) @@ -553,7 +553,7 @@ func (s *server) ScheduleShutdown(duration time.Duration) { ticker.Stop() ticker = time.NewTicker(tickDuration) } - log.Warnf("[SRVR] Server shutdown in %v", remaining) + log.Warnf("SRVR: Server shutdown in %v", remaining) } } }()