From f7366fb51b496fa70de3816ce51344c0a143cb5f Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sat, 14 Jul 2018 12:20:20 -0700 Subject: [PATCH 1/2] server: ensure we only fetch filters we know of --- server.go | 111 +++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 84 insertions(+), 27 deletions(-) diff --git a/server.go b/server.go index cbc13bc6..12925eea 100644 --- a/server.go +++ b/server.go @@ -762,8 +762,21 @@ func (sp *serverPeer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) { return } - hashes, err := sp.server.chain.HeightToHashRange(int32(msg.StartHeight), - &msg.StopHash, wire.MaxGetCFiltersReqRange) + // We'll also ensure that the remote party is requesting a set of + // filters that we actually currently maintain. + switch msg.FilterType { + case wire.GCSFilterRegular: + break + + default: + peerLog.Debug("Filter request for unknown filter: %v", + msg.FilterType) + return + } + + hashes, err := sp.server.chain.HeightToHashRange( + int32(msg.StartHeight), &msg.StopHash, wire.MaxGetCFiltersReqRange, + ) if err != nil { peerLog.Debugf("Invalid getcfilters request: %v", err) return @@ -776,8 +789,9 @@ func (sp *serverPeer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) { hashPtrs[i] = &hashes[i] } - filters, err := sp.server.cfIndex.FiltersByBlockHashes(hashPtrs, - msg.FilterType) + filters, err := sp.server.cfIndex.FiltersByBlockHashes( + hashPtrs, msg.FilterType, + ) if err != nil { peerLog.Errorf("Error retrieving cfilters: %v", err) return @@ -785,10 +799,14 @@ func (sp *serverPeer) OnGetCFilters(_ *peer.Peer, msg *wire.MsgGetCFilters) { for i, filterBytes := range filters { if len(filterBytes) == 0 { - peerLog.Warnf("Could not obtain cfilter for %v", hashes[i]) + peerLog.Warnf("Could not obtain cfilter for %v", + hashes[i]) return } - filterMsg := wire.NewMsgCFilter(msg.FilterType, &hashes[i], filterBytes) + + filterMsg := wire.NewMsgCFilter( + msg.FilterType, &hashes[i], filterBytes, + ) sp.QueueMessage(filterMsg, nil) } } @@ -800,19 +818,32 @@ func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) { return } + // We'll also ensure that the remote party is requesting a set of + // headers for filters that we actually currently maintain. + switch msg.FilterType { + case wire.GCSFilterRegular: + break + + default: + peerLog.Debug("Filter request for unknown headers for "+ + "filter: %v", msg.FilterType) + return + } + startHeight := int32(msg.StartHeight) maxResults := wire.MaxCFHeadersPerMsg - // If StartHeight is positive, fetch the predecessor block hash so we can - // populate the PrevFilterHeader field. + // If StartHeight is positive, fetch the predecessor block hash so we + // can populate the PrevFilterHeader field. if msg.StartHeight > 0 { startHeight-- maxResults++ } // Fetch the hashes from the block index. - hashList, err := sp.server.chain.HeightToHashRange(startHeight, - &msg.StopHash, maxResults) + hashList, err := sp.server.chain.HeightToHashRange( + startHeight, &msg.StopHash, maxResults, + ) if err != nil { peerLog.Debugf("Invalid getcfheaders request: %v", err) } @@ -833,8 +864,9 @@ func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) { } // Fetch the raw filter hash bytes from the database for all blocks. - filterHashes, err := sp.server.cfIndex.FilterHashesByBlockHashes(hashPtrs, - msg.FilterType) + filterHashes, err := sp.server.cfIndex.FilterHashesByBlockHashes( + hashPtrs, msg.FilterType, + ) if err != nil { peerLog.Errorf("Error retrieving cfilter hashes: %v", err) return @@ -892,6 +924,7 @@ func (sp *serverPeer) OnGetCFHeaders(_ *peer.Peer, msg *wire.MsgGetCFHeaders) { headersMsg.FilterType = msg.FilterType headersMsg.StopHash = msg.StopHash + sp.QueueMessage(headersMsg, nil) } @@ -902,21 +935,38 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { return } - blockHashes, err := sp.server.chain.IntervalBlockHashes(&msg.StopHash, - wire.CFCheckptInterval) + // We'll also ensure that the remote party is requesting a set of + // checkpoints for filters that we actually currently maintain. + switch msg.FilterType { + case wire.GCSFilterRegular: + break + + default: + peerLog.Debug("Filter request for unknown checkpoints for "+ + "filter: %v", msg.FilterType) + return + } + + blockHashes, err := sp.server.chain.IntervalBlockHashes( + &msg.StopHash, wire.CFCheckptInterval, + ) if err != nil { peerLog.Debugf("Invalid getcfilters request: %v", err) return } - var updateCache bool - var checkptCache []cfHeaderKV + var ( + updateCache bool + checkptCache []cfHeaderKV + ) + // If the set of check points requested goes back further than what + // we've already generated in our cache, then we'll need to update it. if len(blockHashes) > len(checkptCache) { - // Update the cache if the checkpoint chain is longer than the cached - // one. This ensures that the cache is relatively stable and mostly - // overlaps with the best chain, since it follows the longest chain - // heuristic. + // Update the cache if the checkpoint chain is longer than the + // cached one. This ensures that the cache is relatively stable + // and mostly overlaps with the best chain, since it follows + // the longest chain heuristic. updateCache = true // Take write lock because we are going to update cache. @@ -925,9 +975,13 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { // Grow the checkptCache to be the length of blockHashes. additionalLength := len(blockHashes) - len(checkptCache) - checkptCache = append(sp.server.cfCheckptCaches[msg.FilterType], - make([]cfHeaderKV, additionalLength)...) + checkptCache = append( + sp.server.cfCheckptCaches[msg.FilterType], + make([]cfHeaderKV, additionalLength)..., + ) } else { + // Otherwise, we don't need to update the cache as we already + // have enough headers pre-generated. updateCache = false // Take reader lock because we are not going to update cache. @@ -946,8 +1000,9 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { } // Populate results with cached checkpoints. - checkptMsg := wire.NewMsgCFCheckpt(msg.FilterType, &msg.StopHash, - len(blockHashes)) + checkptMsg := wire.NewMsgCFCheckpt( + msg.FilterType, &msg.StopHash, len(blockHashes), + ) for i := 0; i < forkIdx; i++ { checkptMsg.AddCFHeader(&checkptCache[i].filterHeader) } @@ -958,8 +1013,9 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { blockHashPtrs = append(blockHashPtrs, &blockHashes[i]) } - filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes(blockHashPtrs, - msg.FilterType) + filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes( + blockHashPtrs, msg.FilterType, + ) if err != nil { peerLog.Errorf("Error retrieving cfilter headers: %v", err) return @@ -967,7 +1023,8 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { for i, filterHeaderBytes := range filterHeaders { if len(filterHeaderBytes) == 0 { - peerLog.Warnf("Could not obtain CF header for %v", blockHashPtrs[i]) + peerLog.Warnf("Could not obtain CF header for %v", + blockHashPtrs[i]) return } From 5e86c374110b3974675669c57e046deed0d70a85 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Sat, 14 Jul 2018 20:50:58 -0700 Subject: [PATCH 2/2] server: fix bug in cf checkpoint serving In this commit, we fix a bug in the way that we previously attempted to server cfcheckpoints. In the prior version we would never actually fetch the current length of the cache. As a result, after the first time the checkpoints were fetched, we would always continually grow the cache rather than using what's there if sufficient. In this commit, we fix this behavior by always checking the length, then either keeping the rite lock, or downgrading to a read lock if the size was sufficient. --- server.go | 79 +++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/server.go b/server.go index 12925eea..e41f27e5 100644 --- a/server.go +++ b/server.go @@ -947,6 +947,9 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { return } + // Now that we know the client is fetching a filter that we know of, + // we'll fetch the block hashes et each check point interval so we can + // compare against our cache, and create new check points if necessary. blockHashes, err := sp.server.chain.IntervalBlockHashes( &msg.StopHash, wire.CFCheckptInterval, ) @@ -955,43 +958,55 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { return } - var ( - updateCache bool - checkptCache []cfHeaderKV + checkptMsg := wire.NewMsgCFCheckpt( + msg.FilterType, &msg.StopHash, len(blockHashes), ) - // If the set of check points requested goes back further than what - // we've already generated in our cache, then we'll need to update it. - if len(blockHashes) > len(checkptCache) { - // Update the cache if the checkpoint chain is longer than the - // cached one. This ensures that the cache is relatively stable - // and mostly overlaps with the best chain, since it follows - // the longest chain heuristic. - updateCache = true + // Fetch the current existing cache so we can decide if we need to + // extend it or if its adequate as is. + sp.server.cfCheckptCachesMtx.Lock() + checkptCache := sp.server.cfCheckptCaches[msg.FilterType] - // Take write lock because we are going to update cache. - sp.server.cfCheckptCachesMtx.Lock() + // If the set of block hashes is beyond the current size of the cache, + // then we'll expand the size of the cache and also retain the write + // lock. + var updateCache bool + if len(blockHashes) > len(checkptCache) { + // Now that we know we'll need to modify the size of the cache, + // we'll defer the release of the write lock so we don't + // forget. defer sp.server.cfCheckptCachesMtx.Unlock() - // Grow the checkptCache to be the length of blockHashes. + // We'll mark that we need to update the cache for below and + // also expand the size of the cache in place. + updateCache = true + additionalLength := len(blockHashes) - len(checkptCache) + newEntries := make([]cfHeaderKV, additionalLength) + + peerLog.Infof("Growing size of checkpoint cache from %v to %v "+ + "block hashes", len(checkptCache), len(blockHashes)) + checkptCache = append( sp.server.cfCheckptCaches[msg.FilterType], - make([]cfHeaderKV, additionalLength)..., + newEntries..., ) } else { - // Otherwise, we don't need to update the cache as we already - // have enough headers pre-generated. - updateCache = false - - // Take reader lock because we are not going to update cache. + // Otherwise, we'll release the write lock, then grab the read + // lock, as the cache is already properly sized. + sp.server.cfCheckptCachesMtx.Unlock() sp.server.cfCheckptCachesMtx.RLock() - defer sp.server.cfCheckptCachesMtx.RUnlock() - checkptCache = sp.server.cfCheckptCaches[msg.FilterType] + peerLog.Tracef("Serving stale cache of size %v", + len(checkptCache)) + + defer sp.server.cfCheckptCachesMtx.RUnlock() } - // Iterate backwards until the block hash is found in the cache. + // Now that we know the cache is of an appropriate size, we'll iterate + // backwards until the find the block hash. We do this as it's possible + // a re-org has occurred so items in the db are now in the main china + // while the cache has been partially invalidated. var forkIdx int for forkIdx = len(checkptCache); forkIdx > 0; forkIdx-- { if checkptCache[forkIdx-1].blockHash == blockHashes[forkIdx-1] { @@ -999,20 +1014,19 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { } } - // Populate results with cached checkpoints. - checkptMsg := wire.NewMsgCFCheckpt( - msg.FilterType, &msg.StopHash, len(blockHashes), - ) + // Now that we know the how much of the cache is relevant for this + // query, we'll populate our check point message with the cache as is. + // Shortly below, we'll populate the new elements of the cache. for i := 0; i < forkIdx; i++ { checkptMsg.AddCFHeader(&checkptCache[i].filterHeader) } - // Look up any filter headers that aren't cached. + // We'll now collect the set of hashes that are beyond our cache so we + // can look up the filter headers to populate the final cache. blockHashPtrs := make([]*chainhash.Hash, 0, len(blockHashes)-forkIdx) for i := forkIdx; i < len(blockHashes); i++ { blockHashPtrs = append(blockHashPtrs, &blockHashes[i]) } - filterHeaders, err := sp.server.cfIndex.FilterHeadersByBlockHashes( blockHashPtrs, msg.FilterType, ) @@ -1021,6 +1035,8 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { return } + // Now that we have the full set of filter headers, we'll add them to + // the checkpoint message, and also update our cache in line. for i, filterHeaderBytes := range filterHeaders { if len(filterHeaderBytes) == 0 { peerLog.Warnf("Could not obtain CF header for %v", @@ -1036,6 +1052,9 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { } checkptMsg.AddCFHeader(filterHeader) + + // If the new main chain is longer than what's in the cache, + // then we'll override it beyond the fork point. if updateCache { checkptCache[forkIdx+i] = cfHeaderKV{ blockHash: blockHashes[forkIdx+i], @@ -1044,6 +1063,8 @@ func (sp *serverPeer) OnGetCFCheckpt(_ *peer.Peer, msg *wire.MsgGetCFCheckpt) { } } + // Finally, we'll update the cache if we need to, and send the final + // message back to the requesting peer. if updateCache { sp.server.cfCheckptCaches[msg.FilterType] = checkptCache }