diff --git a/addrmanager.go b/addrmanager.go index 8eb76400..e8a03a30 100644 --- a/addrmanager.go +++ b/addrmanager.go @@ -5,8 +5,13 @@ package main import ( + "container/list" + "encoding/json" "github.com/conformal/btcwire" + "math" + "math/rand" "net" + "os" "strconv" "sync" "time" @@ -17,53 +22,235 @@ const ( // address manager will track. maxAddresses = 2500 newAddressBufferSize = 50 - dumpAddressInterval = time.Minute * 2 + + // dumpAddressInterval is the interval used to dump the address + // cache to disk for future use. + dumpAddressInterval = time.Minute * 2 + + // triedBucketSize is the maximum number of addresses in each + // tried address bucket. + triedBucketSize = 64 + + // newBucketSize is the maximum number of addresses in each new address + // bucket. + newBucketSize = 64 + + // numMissingDays is the number of days before which we assume an + // address has vanished if we have not seen it announced in that long. + numMissingDays = 30 + + // numRetries is the number of tried without a single success before + // we assume an address is bad. + numRetries = 3 + + // maxFailures is the maximum number of failures we will accept without + // a success before considering an address bad. + maxFailures = 10 + + // minBadDays is the number of days since the last success before we + // will consider evicting an address. + minBadDays = 7 ) // updateAddress is a helper function to either update an address already known // to the address manager, or to add the address if not already known. -func updateAddress(a *AddrManager, netAddr *btcwire.NetAddress) { +func (a *AddrManager) updateAddress(netAddr, srcAddr *btcwire.NetAddress) { // Protect concurrent access. - a.addrCacheLock.Lock() - defer a.addrCacheLock.Unlock() + a.mtx.Lock() + defer a.mtx.Unlock() - // Update address if it already exists. - addr := NetAddressKey(netAddr) - if na, ok := a.addrCache[addr]; ok { + ka := a.find(netAddr) + if ka != nil { // Update the last seen time. - if netAddr.Timestamp.After(na.Timestamp) { - na.Timestamp = netAddr.Timestamp + if netAddr.Timestamp.After(ka.na.Timestamp) { + ka.na.Timestamp = netAddr.Timestamp } // Update services. - na.AddService(na.Services) + ka.na.AddService(netAddr.Services) - log.Tracef("[AMGR] Updated address manager address %s", addr) + log.Tracef("[AMGR] Updated address manager address %s", + NetAddressKey(netAddr)) return } // Enforce max addresses. - if len(a.addrCache)+1 > maxAddresses { - log.Tracef("[AMGR] Max addresses of %d reached", maxAddresses) - return + if len(a.addrNew) > newBucketSize { + log.Tracef("[AMGR] new bucket is full, expiring old ") + a.expireNew() } - a.addrCache[addr] = netAddr + addr := NetAddressKey(netAddr) + ka = &knownAddress{na: netAddr} + + // Fill in index. + a.addrIndex[addr] = ka + + // Add to new bucket. + a.addrNew[addr] = ka + log.Tracef("[AMGR] Added new address %s for a total of %d addresses", - addr, len(a.addrCache)) + addr, len(a.addrNew)+a.addrTried.Len()) +} + +// bad returns true if the address in question has not been tried in the last +// minute and meets one of the following +// criteria: +// 1) It claims to be from the future. +// 2) It hasn't been seen in over a month. +// 3) It has failed at least three times and never succeeded. +// 4) It has failed ten times in the last week. +// All addresses that meet these criteria are assumed to be worthless and not +// worth keeping hold of. +func bad(ka *knownAddress) bool { + if ka.lastattempt.After(time.Now().Add(-1 * time.Minute)) { + return false + + } + + // From the future? + if ka.na.Timestamp.After(time.Now().Add(10 * time.Minute)) { + return true + } + + // Over a month old? + if ka.na.Timestamp.After(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) { + return true + } + + // Never succeeded? + if ka.lastsuccess.IsZero() && ka.attempts >= numRetries { + return true + } + + // Hasn't succeeded in too long? + if !ka.lastsuccess.After(time.Now().Add(-1*minBadDays*time.Hour*24)) && + ka.attempts >= maxFailures { + return true + } + + return false +} + +// chance returns the selection probability for a known address. The priority +// depends upon how recent the address has been seen, how recent it was last +// attempted and how often attempts to connect to it have failed. +func chance(ka *knownAddress) float64 { + c := 1.0 + + now := time.Now() + var lastSeen float64 = 0.0 + var lastTry float64 = 0.0 + if !ka.na.Timestamp.After(now) { + var dur time.Duration + if ka.na.Timestamp.IsZero() { + // use unix epoch to match bitcoind. + dur = now.Sub(time.Unix(0, 0)) + + } else { + dur = now.Sub(ka.na.Timestamp) + } + lastSeen = dur.Seconds() + } + if !ka.lastattempt.After(now) { + var dur time.Duration + if ka.lastattempt.IsZero() { + // use unix epoch to match bitcoind. + dur = now.Sub(time.Unix(0, 0)) + } else { + dur = now.Sub(ka.lastattempt) + } + lastTry = dur.Seconds() + } + + c = 600.0 / (600.0 + lastSeen) + + // very recent attempts are less likely to be retried. + if lastTry > 60.0*10.0 { + c *= 0.01 + } + + // failed attempts deprioritise + if ka.attempts > 0 { + c /= (float64(ka.attempts) * 1.5) + } + + return c +} + +// expireNew makes space in the new buckets by expiring the really bad entries. +// If no bad entries are available we look at a few and remove the oldest. +func (a *AddrManager) expireNew() { + // First see if there are any entries that are so bad we can just throw + // them away. otherwise we throw away the oldest entry in the cache. + // Bitcoind here chooses four random and just throws the oldest of + // those away, but we keep track of oldest in the initial traversal and + // use that information instead + var oldest *knownAddress + for k, v := range a.addrNew { + if bad(v) { + log.Tracef("[AMGR] expiring bad address %v", k) + delete(a.addrIndex, k) + delete(a.addrNew, k) + return + } + if oldest == nil { + oldest = v + } else if !v.na.Timestamp.After(oldest.na.Timestamp) { + oldest = v + } + } + + if oldest != nil { + key := NetAddressKey(oldest.na) + log.Tracef("[AMGR] expiring oldest address %v", key) + + delete(a.addrIndex, key) + delete(a.addrNew, key) + } +} + +// pickTried selects an address from the tried bucket to be evicted. +// We just choose the eldest. +func (a *AddrManager) pickTried() *list.Element { + var oldest *knownAddress + var oldestElem *list.Element + for e := a.addrTried.Front(); e != nil; e = e.Next() { + ka := e.Value.(*knownAddress) + if oldest == nil || oldest.na.Timestamp.After(ka.na.Timestamp) { + oldestElem = e + oldest = ka + } + + } + return oldestElem +} + +type knownAddress struct { + na *btcwire.NetAddress + attempts int + lastattempt time.Time + lastsuccess time.Time + time time.Time + tried bool } // AddrManager provides a concurrency safe address manager for caching potential // peers on the bitcoin network. type AddrManager struct { - addrCache map[string]*btcwire.NetAddress - addrCacheLock sync.Mutex - started bool - shutdown bool - newAddresses chan []*btcwire.NetAddress - removeAddresses chan []*btcwire.NetAddress - wg sync.WaitGroup - quit chan bool + mtx sync.Mutex + rand *rand.Rand + addrIndex map[string]*knownAddress // address key to ka for all addrs. + addrNew map[string]*knownAddress + addrTried *list.List + started bool + shutdown bool + wg sync.WaitGroup + quit chan bool +} + +type JsonSave struct { + AddrList []string } // addressHandler is the main handler for the address manager. It must be run @@ -73,18 +260,13 @@ func (a *AddrManager) addressHandler() { out: for !a.shutdown { select { - case addrs := <-a.newAddresses: - for _, na := range addrs { - updateAddress(a, na) - } - case <-dumpAddressTicker.C: if !a.shutdown { - // TODO: Dump addresses to database. + a.savePeers() } case <-a.quit: - // TODO: Dump addresses to database. + a.savePeers() break out } } @@ -93,6 +275,63 @@ out: log.Trace("[AMGR] Address handler done") } +// savePeers saves all the known addresses to a file so they can be read back +// in at next run. +func (a *AddrManager) savePeers() { + // May give some way to specify this later. + filename := "peers.json" + + var toSave JsonSave + + list := a.AddressCacheFlat() + log.Info("LIST ", list) + toSave.AddrList = list + + w, err := os.Create(filename) + if err != nil { + log.Error("Error opening file: ", filename, err) + } + enc := json.NewEncoder(w) + defer w.Close() + enc.Encode(&toSave) + log.Info("Saving peer list.") +} + +// loadPeers loads the known address from the saved file. If empty, missing, or +// malformed file, just don't load anything and start fresh +func (a *AddrManager) loadPeers() { + log.Info("Loading saved peers") + + // May give some way to specify this later. + filename := "peers.json" + + _, err := os.Stat(filename) + if os.IsNotExist(err) { + log.Debugf("%s does not exist.\n", filename) + } else { + r, err := os.Open(filename) + if err != nil { + log.Error("Error opening file: ", filename, err) + return + } + defer r.Close() + + var inList JsonSave + dec := json.NewDecoder(r) + err = dec.Decode(&inList) + if err != nil { + log.Error("Error reading:", filename, err) + return + } + log.Debug("Adding ", len(inList.AddrList), " saved peers.") + if len(inList.AddrList) > 0 { + for _, ip := range inList.AddrList { + a.AddAddressByIP(ip) + } + } + } +} + // Start begins the core address handler which manages a pool of known // addresses, timeouts, and interval based writes. func (a *AddrManager) Start() { @@ -106,6 +345,9 @@ func (a *AddrManager) Start() { a.wg.Add(1) go a.addressHandler() a.started = true + + // Load peers we already know about from file. + a.loadPeers() } // Stop gracefully shuts down the address manager by stopping the main handler. @@ -117,6 +359,7 @@ func (a *AddrManager) Stop() error { } log.Infof("[AMGR] Address manager shutting down") + a.savePeers() a.shutdown = true a.quit <- true a.wg.Wait() @@ -126,50 +369,115 @@ func (a *AddrManager) Stop() error { // AddAddresses adds new addresses to the address manager. It enforces a max // number of addresses and silently ignores duplicate addresses. It is // safe for concurrent access. -func (a *AddrManager) AddAddresses(addrs []*btcwire.NetAddress) { - a.newAddresses <- addrs +func (a *AddrManager) AddAddresses(addrs []*btcwire.NetAddress, + srcAddr *btcwire.NetAddress) { + for _, na := range addrs { + // Filter out non-routable addresses. Note that non-routable + // also includes invalid and local addresses. + if Routable(na) { + a.updateAddress(na, srcAddr) + } + } } // AddAddress adds a new address to the address manager. It enforces a max // number of addresses and silently ignores duplicate addresses. It is // safe for concurrent access. -func (a *AddrManager) AddAddress(addr *btcwire.NetAddress) { - addrs := []*btcwire.NetAddress{addr} - a.newAddresses <- addrs +func (a *AddrManager) AddAddress(addr *btcwire.NetAddress, + srcAddr *btcwire.NetAddress) { + a.AddAddresses([]*btcwire.NetAddress{addr}, srcAddr) +} + +// AddAddressByIP adds an address where we are given an ip:port and not a +// btcwire.NetAddress. +func (a *AddrManager) AddAddressByIP(addrIP string) { + // Split IP and port + addr, portStr, err := net.SplitHostPort(addrIP) + if err != nil { + log.Warnf("[AMGR] AddADddressByIP given bullshit adddress"+ + "(%s): %v", err) + return + } + // Put it in btcwire.Netaddress + var na btcwire.NetAddress + na.Timestamp = time.Now() + na.IP = net.ParseIP(addr) + if na.IP == nil { + log.Error("Invalid ip address:", addr) + return + } + port, err := strconv.ParseUint(portStr, 10, 0) + if err != nil { + log.Error("Invalid port: ", portStr, err) + return + } + na.Port = uint16(port) + a.AddAddress(&na, &na) // XXX use correct src address } // NeedMoreAddresses returns whether or not the address manager needs more // addresses. func (a *AddrManager) NeedMoreAddresses() bool { - // Protect concurrent access. - a.addrCacheLock.Lock() - defer a.addrCacheLock.Unlock() + // NumAddresses handles concurrent access for us. - return len(a.addrCache)+1 <= maxAddresses + return a.NumAddresses()+1 <= maxAddresses } // NumAddresses returns the number of addresses known to the address manager. func (a *AddrManager) NumAddresses() int { - // Protect concurrent access. - a.addrCacheLock.Lock() - defer a.addrCacheLock.Unlock() + a.mtx.Lock() + defer a.mtx.Unlock() - return len(a.addrCache) + return len(a.addrNew) + a.addrTried.Len() } // AddressCache returns the current address cache. It must be treated as -// read-only. +// read-only (but since it is a copy now, this is not as dangerous). func (a *AddrManager) AddressCache() map[string]*btcwire.NetAddress { - return a.addrCache + allAddr := make(map[string]*btcwire.NetAddress) + + a.mtx.Lock() + defer a.mtx.Unlock() + for k, v := range a.addrNew { + allAddr[k] = v.na + } + + for e := a.addrTried.Front(); e != nil; e = e.Next() { + ka := e.Value.(*knownAddress) + allAddr[NetAddressKey(ka.na)] = ka.na + } + + return allAddr +} + +// AddressCacheFlat returns a flat list of strings with the current address +// cache. Just a copy, so one can do whatever they want to it. +func (a *AddrManager) AddressCacheFlat() []string { + var allAddr []string + + a.mtx.Lock() + defer a.mtx.Unlock() + for k, _ := range a.addrNew { + allAddr = append(allAddr, k) + } + + for e := a.addrTried.Front(); e != nil; e = e.Next() { + ka := e.Value.(*knownAddress) + allAddr = append(allAddr, NetAddressKey(ka.na)) + } + + return allAddr } // New returns a new bitcoin address manager. // Use Start to begin processing asynchronous address updates. func NewAddrManager() *AddrManager { am := AddrManager{ - addrCache: make(map[string]*btcwire.NetAddress), - newAddresses: make(chan []*btcwire.NetAddress, newAddressBufferSize), - quit: make(chan bool), + rand: rand.New(rand.NewSource(time.Now().UnixNano())), + addrIndex: make(map[string]*knownAddress), + addrNew: make(map[string]*knownAddress), + addrTried: list.New(), + quit: make(chan bool), } return &am } @@ -181,3 +489,351 @@ func NetAddressKey(na *btcwire.NetAddress) string { addr := net.JoinHostPort(na.IP.String(), port) return addr } + +// GetAddress returns a single address that should be routable. It picks a +// random one from the possible addresses with preference given to ones that +// have not been used recently and should not pick 'close' addresses +// consecutively. +func (a *AddrManager) GetAddress(class string, newBias int) *knownAddress { + // Protect concurrent access. + a.mtx.Lock() + defer a.mtx.Unlock() + + if newBias > 100 { + newBias = 100 + } + if newBias < 0 { + newBias = 0 + } + + // Bias 50% for now between new and tried. + triedCorrelation := math.Sqrt(float64(a.addrTried.Len())) * + (100.0 - float64(newBias)) + newCorrelation := math.Sqrt(float64(len(a.addrNew))) * float64(newBias) + + if (newCorrelation+triedCorrelation)*a.rand.Float64() < + triedCorrelation { + // Tried entry. + large := 1 << 30 + factor := 1.0 + for { + // Pick a random entry in the list + e := a.addrTried.Front() + for i := a.rand.Int63n(int64(a.addrTried.Len())); + i > 0; i-- { + e = e.Next() + } + ka := e.Value.(*knownAddress) + randval := a.rand.Intn(large) + if float64(randval) < (factor * chance(ka) * float64(large)) { + log.Tracef("[AMGR] Selected %v from tried "+ + "bucket", NetAddressKey(ka.na)) + return ka + } + factor *= 1.2 + } + } else { + // new node. + // XXX use a closure/function to avoid repeating this. + keyList := []string{} + for key := range a.addrNew { + keyList = append(keyList, key) + } + large := 1 << 30 + factor := 1.0 + for { + testKey := keyList[a.rand.Int63n(int64(len(keyList)))] + ka := a.addrNew[testKey] + randval := a.rand.Intn(large) + if float64(randval) < (factor * chance(ka) * float64(large)) { + log.Tracef("[AMGR] Selected %v from new bucket", + NetAddressKey(ka.na)) + return ka + } + factor *= 1.2 + } + } + return nil +} + +func (a *AddrManager) find(addr *btcwire.NetAddress) *knownAddress { + return a.addrIndex[NetAddressKey(addr)] +} + +/* + * Connected - updates the last seen time but only every 20 minutes. + * Good - last tried = last success = last seen = now. attmempts = 0. + * - move address to tried. + * Attempted - set last tried to time. nattempts++ + */ +func (a *AddrManager) Attempt(addr *btcwire.NetAddress) { + a.mtx.Lock() + defer a.mtx.Unlock() + + // find address. + // Surely address will be in tried by now? + ka := a.find(addr) + if ka == nil { + return + } + // set last tried time to now + ka.attempts++ + ka.lastattempt = time.Now() +} + +// Connected Marks the given address as currently connected and working at the +// current time. The address must already be known to AddrManager else it will +// be ignored. +func (a *AddrManager) Connected(addr *btcwire.NetAddress) { + a.mtx.Lock() + defer a.mtx.Unlock() + + ka := a.find(addr) + if ka == nil { + return + } + + // Update the time as long as it has been 20 minutes since last we did + // so. + now := time.Now() + if now.After(ka.na.Timestamp.Add(time.Minute * 20)) { + ka.na.Timestamp = time.Now() + } +} + +// Good marks the given address as good. To be called after a successful +// connection and version exchange. If the address is unkownto the addresss +// manager it will be ignored. +func (a *AddrManager) Good(addr *btcwire.NetAddress) { + a.mtx.Lock() + defer a.mtx.Unlock() + + ka := a.find(addr) + if ka == nil { + return + } + now := time.Now() + ka.lastsuccess = now + ka.lastattempt = now + ka.na.Timestamp = now + ka.attempts = 0 + + // move to tried set, optionally evicting other addresses if neeed. + if ka.tried { + return + } + + // ok, need to move it to tried. + + // remove from new buckets. + addrKey := NetAddressKey(addr) + delete(a.addrNew, addrKey) + + // is tried full? or is it ok? + if a.addrTried.Len() < triedBucketSize { + a.addrTried.PushBack(ka) + return + } + + // No room, we have to evict something else. + + // pick another one to throw out + entry := a.pickTried() + rmka := entry.Value.(*knownAddress) + + rmkey := NetAddressKey(rmka.na) + + // replace with ka. + entry.Value = ka + + rmka.tried = false + + log.Tracef("[AMGR] replacing %s with %s in tried", rmkey, addrKey) + + // We know there is space for it since we just moved out of new. + // TODO(oga) when we move to multiple buckets then we will need to + // check for size and consider putting it elsewhere. + a.addrNew[rmkey] = rmka +} + +// RFC1918: IPv4 Private networks (10.0.0.0/8, 192.168.0.0/16, 172.16.0.0/12) +var rfc1918ten = net.IPNet{IP: net.ParseIP("10.0.0.0"), + Mask: net.CIDRMask(8, 32)} +var rfc1918oneninetwo = net.IPNet{IP: net.ParseIP("192.168.0.0"), + Mask: net.CIDRMask(16, 32)} +var rfc1918oneseventwo = net.IPNet{IP: net.ParseIP("172.16.0.0"), + Mask: net.CIDRMask(12, 32)} + +func RFC1918(na *btcwire.NetAddress) bool { + return rfc1918ten.Contains(na.IP) || + rfc1918oneninetwo.Contains(na.IP) || + rfc1918oneseventwo.Contains(na.IP) +} + +// RFC3849 IPv6 Documentation address (2001:0DB8::/32) +var rfc3849 = net.IPNet{IP: net.ParseIP("2001:0DB8::"), + Mask: net.CIDRMask(32, 128)} + +func RFC3849(na *btcwire.NetAddress) bool { + return rfc3849.Contains(na.IP) +} + +// RFC3927 IPv4 Autoconfig (169.254.0.0/16) +var rfc3927 = net.IPNet{IP: net.ParseIP("169.254.0.0"), Mask: net.CIDRMask(16, 32)} + +func RFC3927(na *btcwire.NetAddress) bool { + return rfc3927.Contains(na.IP) +} + +// RFC3964 IPv6 6to4 (2002::/16) +var rfc3964 = net.IPNet{IP: net.ParseIP("2002::"), + Mask: net.CIDRMask(16, 128)} + +func RFC3964(na *btcwire.NetAddress) bool { + return rfc3964.Contains(na.IP) +} + +// RFC4193 IPv6 unique local (FC00::/15) +var rfc4193 = net.IPNet{IP: net.ParseIP("FC00::"), + Mask: net.CIDRMask(15, 128)} + +func RFC4193(na *btcwire.NetAddress) bool { + return rfc4193.Contains(na.IP) +} + +// RFC4380 IPv6 Teredo tunneling (2001::/32) +var rfc4380 = net.IPNet{IP: net.ParseIP("2001::"), + Mask: net.CIDRMask(32, 128)} + +func RFC4380(na *btcwire.NetAddress) bool { + return rfc4380.Contains(na.IP) +} + +// RFC4843 IPv6 ORCHID: (2001:10::/28) +var rfc4843 = net.IPNet{IP: net.ParseIP("2001;10::"), + Mask: net.CIDRMask(28, 128)} + +func RFC4843(na *btcwire.NetAddress) bool { + return rfc4843.Contains(na.IP) +} + +// RFC4862 IPv6 Autoconfig (FE80::/64) +var rfc4862 = net.IPNet{IP: net.ParseIP("FE80::"), + Mask: net.CIDRMask(64, 128)} + +func RFC4862(na *btcwire.NetAddress) bool { + return rfc4862.Contains(na.IP) +} + +// RFC6052: IPv6 well known prefix (64:FF9B::/96) +var rfc6052 = net.IPNet{IP: net.ParseIP("64::FF9B::"), + Mask: net.CIDRMask(96, 128)} + +func RFC6052(na *btcwire.NetAddress) bool { + return rfc6052.Contains(na.IP) +} + +// RFC6145: IPv6 IPv4 translated address ::FFFF:0:0:0/96 +var rfc6145 = net.IPNet{IP: net.ParseIP("::FFFF:0:0:0"), + Mask: net.CIDRMask(96, 128)} + +func RFC6145(na *btcwire.NetAddress) bool { + return rfc6145.Contains(na.IP) +} + +func Tor(na *btcwire.NetAddress) bool { + // bitcoind encodes a .onion address as a 16 byte number by decoding the + // address prior to the .onion (i.e. the key hash) base32 into a ten + // byte number. it then stores the first 6 bytes of the address as + // 0xfD, 0x87, 0xD8, 0x7e, 0xeb, 0x43 + // making the format + // { magic 6 bytes, 10 bytes base32 decode of key hash } + // Since we use btcwire.NetAddress to represent and address we may + // well have to emulate this. + // XXX fillmein + return false +} + +var zero4 = net.IPNet{IP: net.ParseIP("0.0.0.0"), + Mask: net.CIDRMask(8, 32)} + +func Local(na *btcwire.NetAddress) bool { + return na.IP.IsLoopback() || zero4.Contains(na.IP) +} + +// Valid returns true if an address is not one of the invalid formats. +// For IPv4 these are either a 0 or all bits set address. For IPv6 a zero +// address or one that matches the RFC3849 documentation address format. +func Valid(na *btcwire.NetAddress) bool { + // IsUnspecified returns if address is 0, so only all bits set, and + // RFC3849 need to be explicitly checked. bitcoind here also checks for + // invalid protocol addresses from earlier versions of bitcoind (before + // 0.2.9), however, since protocol versions before 70001 are + // disconnected by the bitcoin network now we have elided it. + return !(na.IP.IsUnspecified() || RFC3849(na) || + na.IP.Equal(net.IPv4bcast)) +} + +// Routable returns whether a netaddress is routable on the public internet or +// not. This is true as long as the address is valid and is not in any reserved +// ranges. +func Routable(na *btcwire.NetAddress) bool { + return Valid(na) && !(RFC1918(na) || RFC3927(na) || RFC4862(na) || + RFC4193(na) || Tor(na) || RFC4843(na) || Local(na)) +} + +// GroupKey returns a string representing the network group an address +// is part of. +// This is the /16 for IPv6, the /32 (/36 for he.net) for IPv6, the string +// "local" for a local address and the string "unroutable for an unroutable +// address. +func GroupKey(na *btcwire.NetAddress) string { + if Local(na) { + return "local" + } + if !Routable(na) { + return "unroutable" + } + + if ipv4 := na.IP.To4(); ipv4 != nil { + return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(16, 32)}).String() + } + if RFC6145(na) || RFC6052(na) { + // last four bytes are the ip address + ip := net.IP(na.IP[12:16]) + return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String() + } + + if RFC3964(na) { + ip := net.IP(na.IP[2:7]) + return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String() + + } + if RFC4380(na) { + // teredo tunnels have the last 4 bytes as the v4 address XOR + // 0xff. + ip := net.IP(make([]byte, 4)) + for i, byte := range na.IP[12:16] { + ip[i] = byte ^ 0xff + } + return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String() + } + // XXX tor? + if Tor(na) { + panic("oga should have implemented me") + } + + // OK, so now we know ourselves to be a IPv6 address. + // bitcoind uses /32 for everything but what it calls he.net, which is + // it uses /36 for. he.net is actualy 2001:470::/32, whereas bitcoind + // counts it as 2011:470::/32. + + bits := 32 + heNet := &net.IPNet{IP: net.ParseIP("2011:470::"), + Mask: net.CIDRMask(32, 128)} + if heNet.Contains(na.IP) { + bits = 36 + } + + return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String() +} diff --git a/btcd.go b/btcd.go index db665b30..dc7e6f03 100644 --- a/btcd.go +++ b/btcd.go @@ -9,14 +9,10 @@ import ( "github.com/conformal/btcchain" "github.com/conformal/btcdb" "github.com/conformal/btcscript" - "github.com/conformal/btcwire" "github.com/conformal/seelog" - "math/rand" "net" "os" "runtime" - "strconv" - "time" ) // These constants are used by the dns seed code to pick a random last seen @@ -142,42 +138,6 @@ func btcdMain() error { } server.Start() - // only ask dns for peers if we don't have a list of initial seeds. - if !cfg.DisableDNSSeed { - proxy := "" - if cfg.Proxy != "" && cfg.UseTor { - proxy = cfg.Proxy - } - seedpeers := dnsDiscover(activeNetParams.dnsSeeds, proxy) - addresses := make([]*btcwire.NetAddress, len(seedpeers)) - // if this errors then we have *real* problems - intPort, _ := strconv.Atoi(activeNetParams.peerPort) - for i, peer := range seedpeers { - addresses[i] = new(btcwire.NetAddress) - addresses[i].SetAddress(peer, uint16(intPort)) - // bitcoind seeds with addresses from - // a time randomly selected between 3 - // and 7 days ago. - addresses[i].Timestamp = time.Now().Add(-1 * - time.Second * time.Duration(secondsIn3Days+ - rand.Int31n(secondsIn4Days))) - } - - server.addrManager.AddAddresses(addresses) - // XXX if this is empty do we want to use hardcoded - // XXX peers like bitcoind does? - } - - peers := cfg.ConnectPeers - if len(peers) == 0 { - peers = cfg.AddPeers - } - // Connect to initial peers. - for _, addr := range peers { - // Connect to peer and add it to the server. - server.ConnectPeerAsync(addr, true) - } - server.WaitForShutdown() return nil } diff --git a/discovery.go b/discovery.go index 76896385..e3c04119 100644 --- a/discovery.go +++ b/discovery.go @@ -149,22 +149,18 @@ func torLookupIP(host, proxy string) ([]net.IP, error) { // resolution. If any errors occur then the seeder that errored will not have // any hosts in the list. Therefore if all hosts failed an empty slice of // strings will be returned. -func dnsDiscover(seeders []string, proxy string) []net.IP { - peers := []net.IP{} - for _, seeder := range seeders { - log.Debugf("[DISC] Fetching list of seeds from %v", seeder) - newPeers, err := doDNSLookup(seeder, proxy) - if err != nil { - seederPlusProxy := seeder - if proxy != "" { - seederPlusProxy = fmt.Sprintf("%s (proxy %s)", - seeder, proxy) - } - log.Warnf("[DISC] Failed to fetch dns seeds "+ - "from %s: %v", seederPlusProxy, err) - continue +func dnsDiscover(seeder string, proxy string) []net.IP { + log.Debugf("[DISC] Fetching list of seeds from %v", seeder) + peers, err := doDNSLookup(seeder, proxy) + if err != nil { + seederPlusProxy := seeder + if proxy != "" { + seederPlusProxy = fmt.Sprintf("%s (proxy %s)", + seeder, proxy) } - peers = append(peers, newPeers...) + log.Warnf("[DISC] Failed to fetch dns seeds "+ + "from %s: %v", seederPlusProxy, err) + return []net.IP{} } return peers diff --git a/peer.go b/peer.go index 9e22105e..d46beed7 100644 --- a/peer.go +++ b/peer.go @@ -97,6 +97,8 @@ type peer struct { services btcwire.ServiceFlag started bool conn net.Conn + addr string + na *btcwire.NetAddress timeConnected time.Time inbound bool disconnect bool @@ -228,15 +230,15 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { p.Disconnect() return } + } - // Add inbound peer address to the server address manager. - na, err := btcwire.NewNetAddress(p.conn.RemoteAddr(), p.services) - if err != nil { - log.Errorf("[PEER] %v", err) - p.Disconnect() - return - } - p.server.addrManager.AddAddress(na) + var err error + // Set up a netaddress for the peer to be used with addrmanager.. + p.na, err = newNetAddress(p.conn.RemoteAddr(), p.services) + if err != nil { + log.Errorf("[PEER] %v", err) + p.Disconnect() + return } // Send verack. @@ -263,10 +265,19 @@ func (p *peer) handleVersionMsg(msg *btcwire.MsgVersion) { // Request known addresses if the server address manager needs // more and the peer has a protocol version new enough to // include a timestamp with addresses. + // XXX bitcoind only does this if we have < 1000 addresses, not + // the max of 2400 hasTimestamp := p.protocolVersion >= btcwire.NetAddressTimeVersion if p.server.addrManager.NeedMoreAddresses() && hasTimestamp { p.outputQueue <- btcwire.NewMsgGetAddr() } + // Add inbound peer address to the server address manager. + p.server.addrManager.Good(p.na) + } else { + if NetAddressKey(&msg.AddrMe) == NetAddressKey(p.na) { + p.server.addrManager.AddAddress(p.na, p.na) + p.server.addrManager.Good(p.na) + } } // Signal the block manager this peer is a new sync candidate. @@ -758,7 +769,7 @@ func (p *peer) handleAddrMsg(msg *btcwire.MsgAddr) { // Add addresses to server address manager. The address manager handles // the details of things such as preventing duplicate addresses, max // addresses, and last seen updates. - p.server.addrManager.AddAddresses(msg.AddrList) + p.server.addrManager.AddAddresses(msg.AddrList, p.na) } // handlePingMsg is invoked when a peer receives a ping bitcoin message. For @@ -882,10 +893,12 @@ out: break out } + markConnected := false // Handle each supported message type. switch msg := rmsg.(type) { case *btcwire.MsgVersion: p.handleVersionMsg(msg) + markConnected = true case *btcwire.MsgVerAck: // Do nothing. @@ -895,9 +908,11 @@ out: case *btcwire.MsgAddr: p.handleAddrMsg(msg) + markConnected = true case *btcwire.MsgPing: p.handlePingMsg(msg) + markConnected = true case *btcwire.MsgPong: // Don't do anything, but could try to work out network @@ -911,9 +926,11 @@ out: case *btcwire.MsgInv: p.handleInvMsg(msg) + markConnected = true case *btcwire.MsgGetData: p.handleGetDataMsg(msg) + markConnected = true case *btcwire.MsgGetBlocks: p.handleGetBlocksMsg(msg) @@ -925,6 +942,14 @@ out: log.Debugf("[PEER] Received unhandled message of type %v: Fix Me", rmsg.Command()) } + if markConnected && !p.disconnect { + if p.na == nil { + log.Warnf("we're getting stuff before we " + + "got a version message. that's bad") + continue + } + p.server.addrManager.Connected(p.na) + } } // Ensure connection is closed and notify server and block manager that @@ -1047,28 +1072,51 @@ func (p *peer) Start() error { // a flag so the impending shutdown can be detected. func (p *peer) Disconnect() { p.disconnect = true - p.conn.Close() + if p.conn != nil { + p.conn.Close() + } } // Shutdown gracefully shuts down the peer by disconnecting it and waiting for // all goroutines to finish. func (p *peer) Shutdown() { - log.Tracef("[PEER] Shutdown peer %s", p.conn.RemoteAddr()) + log.Tracef("[PEER] Shutdown peer %s", p.addr) p.Disconnect() p.wg.Wait() } // newPeer returns a new bitcoin peer for the provided server and connection. // Use start to begin processing incoming and outgoing messages. -func newPeer(s *server, conn net.Conn, inbound bool, persistent bool) *peer { +func newPeer(s *server, conn net.Conn) *peer { p := peer{ server: s, protocolVersion: btcwire.ProtocolVersion, btcnet: s.btcnet, services: btcwire.SFNodeNetwork, conn: conn, + addr: conn.RemoteAddr().String(), timeConnected: time.Now(), - inbound: inbound, + inbound: true, + persistent: false, + knownAddresses: make(map[string]bool), + outputQueue: make(chan btcwire.Message, outputBufferSize), + quit: make(chan bool), + } + return &p +} + +// newOutbountPeer returns a new bitcoin peer for the provided server and +// address and connects to it asynchronously. If the connetion is successful +// then the peer will also be started. +func newOutboundPeer(s *server, addr string, persistent bool) *peer { + p := peer{ + server: s, + protocolVersion: btcwire.ProtocolVersion, + btcnet: s.btcnet, + services: btcwire.SFNodeNetwork, + addr: addr, + timeConnected: time.Now(), + inbound: false, persistent: persistent, knownAddresses: make(map[string]bool), knownInventory: NewMruInventoryMap(maxKnownInventory), @@ -1079,5 +1127,82 @@ func newPeer(s *server, conn net.Conn, inbound bool, persistent bool) *peer { blockProcessed: make(chan bool, 1), quit: make(chan bool), } + // set up p.na with a temporary address that we are connecting to with + // faked up service flags. We will replace this with the real one after + // version negotiation is successful. The only failure case here would + // be if the string was incomplete for connection so can't be split + // into address and port, and thus this would be invalid anyway. In + // which case we return nil to be handled by the caller. + // This must be done before we fork off the goroutine because as soon + // as this function returns the peer must have a valid netaddress. + ip, portStr, err := net.SplitHostPort(addr) + if err != nil { + log.Errorf("tried to create a new outbound peer with invalid "+ + "address %s: %v", addr, err) + return nil + } + + port, err := strconv.ParseUint(portStr, 10, 16) + if err != nil { + log.Errorf("tried to create a new outbound peer with invalid "+ + "port %s: %v", portStr, err) + return nil + } + p.na = btcwire.NewNetAddressIPPort(net.ParseIP(ip), uint16(port), 0) + + go func() { + // Select which dial method to call depending on whether or + // not a proxy is configured. Also, add proxy information to + // logged address if needed. + dial := net.Dial + faddr := addr + if cfg.Proxy != "" { + proxy := &socks.Proxy{cfg.Proxy, cfg.ProxyUser, cfg.ProxyPass} + dial = proxy.Dial + faddr = fmt.Sprintf("%s via proxy %s", addr, cfg.Proxy) + } + p.wg.Add(1) + + // Attempt to connect to the peer. If the connection fails and + // this is a persistent connection, retry after the retry + // interval. + for !s.shutdown { + log.Debugf("[SRVR] Attempting to connect to %s", faddr) + conn, err := dial("tcp", addr) + if err != nil { + log.Errorf("[SRVR] failed to connect to %s: %v", + faddr, err) + if !persistent { + p.server.donePeers <- &p + p.wg.Done() + return + } + log.Infof("[SRVR] Retrying connection to %s "+ + "in %s", faddr, connectionRetryInterval) + time.Sleep(connectionRetryInterval) + continue + } + + // while we were sleeping trying to get connect then + // the server may have scheduled a shutdown. In that + // case we ditch the connection immediately. + if !s.shutdown { + + p.server.addrManager.Attempt(p.na) + + // Connection was successful so log it and start peer. + log.Infof("[SRVR] Connected to %s", conn.RemoteAddr()) + p.conn = conn + p.Start() + } else { + p.server.donePeers <- &p + } + // We are done here, Start() will have grabbed + // additional waitgroup entries if we are not shutting + // down. + p.wg.Done() + return + } + }() return &p } diff --git a/server.go b/server.go index 389a5276..c5135d78 100644 --- a/server.go +++ b/server.go @@ -9,8 +9,8 @@ import ( "fmt" "github.com/conformal/btcdb" "github.com/conformal/btcwire" - "github.com/conformal/go-socks" "net" + "strconv" "sync" "time" ) @@ -22,6 +22,9 @@ const supportedServices = btcwire.SFNodeNetwork // when connecting to persistent peers. const connectionRetryInterval = time.Second * 10 +// defaultMaxOutbound is the default number of max outbound peers. +const defaultMaxOutbound = 8 + // directionString is a helper function that returns a string that represents // the direction of a connection (inbound or outbound). func directionString(inbound bool) string { @@ -53,6 +56,7 @@ type server struct { newPeers chan *peer donePeers chan *peer banPeers chan *peer + wakeup chan bool relayInv chan *btcwire.InvVect broadcast chan broadcastMsg wg sync.WaitGroup @@ -62,29 +66,33 @@ type server struct { // handleAddPeerMsg deals with adding new peers. It is invoked from the // peerHandler goroutine. -func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, p *peer) { +func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, p *peer) bool { + if p == nil { + return false + } + // Ignore new peers if we're shutting down. direction := directionString(p.inbound) if s.shutdown { log.Infof("[SRVR] New peer %s (%s) ignored - server is "+ - "shutting down", p.conn.RemoteAddr(), direction) + "shutting down", p.addr, direction) p.Shutdown() - return + return false } // Disconnect banned peers. - host, _, err := net.SplitHostPort(p.conn.RemoteAddr().String()) + host, _, err := net.SplitHostPort(p.addr) if err != nil { log.Errorf("[SRVR] %v", err) p.Shutdown() - return + return false } if banEnd, ok := banned[host]; ok { if time.Now().Before(banEnd) { log.Debugf("[SRVR] Peer %s is banned for another %v - "+ "disconnecting", host, banEnd.Sub(time.Now())) p.Shutdown() - return + return false } log.Infof("[SRVR] Peer %s is no longer banned", host) @@ -96,43 +104,52 @@ func (s *server) handleAddPeerMsg(peers *list.List, banned map[string]time.Time, // Limit max number of total peers. if peers.Len() >= cfg.MaxPeers { log.Infof("[SRVR] Max peers reached [%d] - disconnecting "+ - "peer %s (%s)", cfg.MaxPeers, p.conn.RemoteAddr(), - direction) + "peer %s (%s)", cfg.MaxPeers, p.addr, direction) p.Shutdown() - return + // TODO(oga) how to handle permanent peers here? + // they should be rescheduled. + return false } // Add the new peer and start it. - log.Infof("[SRVR] New peer %s (%s)", p.conn.RemoteAddr(), direction) + log.Infof("[SRVR] New peer %s (%s)", p.addr, direction) peers.PushBack(p) - p.Start() + if p.inbound { + p.Start() + } + + return true } // handleDonePeerMsg deals with peers that have signalled they are done. It is // invoked from the peerHandler goroutine. -func (s *server) handleDonePeerMsg(peers *list.List, p *peer) { +func (s *server) handleDonePeerMsg(peers *list.List, p *peer) bool { direction := directionString(p.inbound) for e := peers.Front(); e != nil; e = e.Next() { if e.Value == p { - peers.Remove(e) - log.Infof("[SRVR] Removed peer %s (%s)", - p.conn.RemoteAddr(), direction) // Issue an asynchronous reconnect if the peer was a // persistent outbound connection. - if !p.inbound && p.persistent { - addr := p.conn.RemoteAddr().String() - s.ConnectPeerAsync(addr, true) + if !p.inbound && p.persistent && !s.shutdown { + // attempt reconnect. + addr := p.addr + e.Value = newOutboundPeer(s, addr, true) + return false } - return + peers.Remove(e) + log.Infof("[SRVR] Removed peer %s (%s)", p.addr, + direction) + return true } } + log.Warnf("[SRVR] Lost peer %v that we never had!", p) + return false } // handleBanPeerMsg deals with banning peers. It is invoked from the // peerHandler goroutine. func (s *server) handleBanPeerMsg(banned map[string]time.Time, p *peer) { - host, _, err := net.SplitHostPort(p.conn.RemoteAddr().String()) + host, _, err := net.SplitHostPort(p.addr) if err != nil { log.Errorf("[SRVR] %v", err) return @@ -172,8 +189,12 @@ func (s *server) handleBroadcastMsg(peers *list.List, bmsg *broadcastMsg) { excluded = true } } + p := e.Value.(*peer) + // Don't broadcast to still connecting outbound peers . + if p.conn == nil { + excluded = true + } if !excluded { - p := e.Value.(*peer) p.QueueMessage(bmsg.message) } } @@ -192,7 +213,7 @@ func (s *server) listenHandler(listener net.Listener) { } continue } - s.AddPeer(newPeer(s, conn, true, false)) + s.AddPeer(newPeer(s, conn)) } s.wg.Done() log.Tracef("[SRVR] Listener handler done for %s", listener.Addr()) @@ -213,17 +234,80 @@ func (s *server) peerHandler() { log.Tracef("[SRVR] Starting peer handler") peers := list.New() bannedPeers := make(map[string]time.Time) + outboundPeers := 0 + maxOutbound := defaultMaxOutbound + if cfg.MaxPeers < maxOutbound { + maxOutbound = cfg.MaxPeers + } + var wakeupTimer *time.Timer = nil + + // Do initial DNS seeding to populate address manager. + if !cfg.DisableDNSSeed { + proxy := "" + if cfg.Proxy != "" && cfg.UseTor { + proxy = cfg.Proxy + } + for _, seeder := range activeNetParams.dnsSeeds { + seedpeers := dnsDiscover(seeder, proxy) + if len(seedpeers) == 0 { + continue + } + addresses := make([]*btcwire.NetAddress, len(seedpeers)) + // if this errors then we have *real* problems + intPort, _ := strconv.Atoi(activeNetParams.peerPort) + for i, peer := range seedpeers { + addresses[i] = new(btcwire.NetAddress) + addresses[i].SetAddress(peer, uint16(intPort)) + // bitcoind seeds with addresses from + // a time randomly selected between 3 + // and 7 days ago. + addresses[i].Timestamp = time.Now().Add(-1 * + time.Second * time.Duration(secondsIn3Days+ + s.addrManager.rand.Int31n(secondsIn4Days))) + } + + // Bitcoind uses a lookup of the dns seeder here. This + // is rather strange since the values looked up by the + // DNS seed lookups will vary quite a lot. + // to replicate this behaviour we put all addresses as + // having come from the first one. + s.addrManager.AddAddresses(addresses, addresses[0]) + } + // XXX if this is empty do we want to use hardcoded + // XXX peers like bitcoind does? + } + + // Start up persistent peers. + permanentPeers := cfg.ConnectPeers + if len(permanentPeers) == 0 { + permanentPeers = cfg.AddPeers + } + for _, addr := range permanentPeers { + if s.handleAddPeerMsg(peers, bannedPeers, + newOutboundPeer(s, addr, true)) { + outboundPeers++ + } + } + + // if nothing else happens, wake us up soon. + time.AfterFunc(10*time.Second, func() { s.wakeup <- true }) // Live while we're not shutting down or there are still connected peers. for !s.shutdown || peers.Len() != 0 { select { // New peers connected to the server. case p := <-s.newPeers: - s.handleAddPeerMsg(peers, bannedPeers, p) + if s.handleAddPeerMsg(peers, bannedPeers, p) && + !p.inbound { + outboundPeers++ + } // Disconnected peers. case p := <-s.donePeers: - s.handleDonePeerMsg(peers, p) + // handleDonePeerMsg return true if it removed a peer + if s.handleDonePeerMsg(peers, p) { + outboundPeers-- + } // Peer to ban. case p := <-s.banPeers: @@ -238,6 +322,10 @@ func (s *server) peerHandler() { case bmsg := <-s.broadcast: s.handleBroadcastMsg(peers, &bmsg) + // Used by timers below to wake us back up. + case <-s.wakeup: + // this page left intentionally blank + // Shutdown the peer handler. case <-s.quit: // Shutdown peers. @@ -246,6 +334,98 @@ func (s *server) peerHandler() { p.Shutdown() } } + + // Timer was just to make sure we woke up again soon. so cancel + // and remove it as soon as we next come around. + if wakeupTimer != nil { + wakeupTimer.Stop() + wakeupTimer = nil + } + + // Only try connect to more peers if we actually need more + if outboundPeers >= maxOutbound || s.shutdown { + continue + } + groups := make(map[string]int) + for e := peers.Front(); e != nil; e = e.Next() { + peer := e.Value.(*peer) + if !peer.inbound { + groups[GroupKey(peer.na)]++ + } + } + + tries := 0 + for outboundPeers < maxOutbound && + peers.Len() < cfg.MaxPeers && !s.shutdown { + // We bias like bitcoind does, 10 for no outgoing + // up to 90 (8) for the selection of new vs tried + //addresses. + + nPeers := outboundPeers + if nPeers > 8 { + nPeers = 8 + } + addr := s.addrManager.GetAddress("any", 10+nPeers*10) + if addr == nil { + break + } + key := GroupKey(addr.na) + // Address will not be invalid, local or unroutable + // because addrmanager rejects those on addition. + // Just check that we don't already have an address + // in the same group so that we are not connecting + // to the same network segment at the expense of + // others. bitcoind breaks out of the loop here, but + // we continue to try other addresses. + if groups[key] != 0 { + continue + } + + tries++ + // After 100 bad tries exit the loop and we'll try again + // later. + if tries > 100 { + break + } + + // XXX if we have limited that address skip + + // only allow recent nodes (10mins) after we failed 30 + // times + if time.Now().After(addr.lastattempt.Add(10*time.Minute)) && + tries < 30 { + continue + } + + // allow nondefault ports after 50 failed tries. + if fmt.Sprintf("%d", addr.na.Port) != + activeNetParams.peerPort && tries < 50 { + continue + } + + addrStr := NetAddressKey(addr.na) + + tries = 0 + // any failure will be due to banned peers etc. we have + // already checked that we have room for more peers. + if s.handleAddPeerMsg(peers, bannedPeers, + newOutboundPeer(s, addrStr, false)) { + outboundPeers++ + groups[key]++ + } + } + + // We we need more peers, wake up in ten seconds and try again. + if outboundPeers < maxOutbound && peers.Len() < cfg.MaxPeers { + time.AfterFunc(10*time.Second, func() { + s.wakeup <- true + }) + } + } + + if wakeupTimer != nil { + wakeupTimer.Stop() + wakeupTimer = nil } s.blockManager.Stop() @@ -279,52 +459,6 @@ func (s *server) BroadcastMessage(msg btcwire.Message, exclPeers ...*peer) { s.broadcast <- bmsg } -// ConnectPeerAsync attempts to asynchronously connect to addr. If successful, -// a new peer is created and added to the server's peer list. -func (s *server) ConnectPeerAsync(addr string, persistent bool) { - // Don't try to connect to a peer if the server is shutting down. - if s.shutdown { - return - } - - go func() { - // Select which dial method to call depending on whether or - // not a proxy is configured. Also, add proxy information to - // logged address if needed. - dial := net.Dial - faddr := addr - if cfg.Proxy != "" { - proxy := &socks.Proxy{cfg.Proxy, cfg.ProxyUser, cfg.ProxyPass} - dial = proxy.Dial - faddr = fmt.Sprintf("%s via proxy %s", addr, cfg.Proxy) - } - - // Attempt to connect to the peer. If the connection fails and - // this is a persistent connection, retry after the retry - // interval. - for !s.shutdown { - log.Debugf("[SRVR] Attempting to connect to %s", faddr) - conn, err := dial("tcp", addr) - if err != nil { - log.Errorf("[SRVR] %v", err) - if !persistent { - return - } - log.Infof("[SRVR] Retrying connection to %s "+ - "in %s", faddr, connectionRetryInterval) - time.Sleep(connectionRetryInterval) - continue - } - - // Connection was successful so log it and create a new - // peer. - log.Infof("[SRVR] Connected to %s", conn.RemoteAddr()) - s.AddPeer(newPeer(s, conn, false, persistent)) - return - } - }() -} - // Start begins accepting connections from peers. func (s *server) Start() { // Already started? @@ -465,6 +599,7 @@ func newServer(addr string, db btcdb.Db, btcnet btcwire.BitcoinNet) (*server, er newPeers: make(chan *peer, cfg.MaxPeers), donePeers: make(chan *peer, cfg.MaxPeers), banPeers: make(chan *peer, cfg.MaxPeers), + wakeup: make(chan bool), relayInv: make(chan *btcwire.InvVect, cfg.MaxPeers), broadcast: make(chan broadcastMsg, cfg.MaxPeers), quit: make(chan bool),