storage/redis: use redis reply

Change-Id: If6e6c2545b12c249413d3d13ea41e127b8d1d9b0
This commit is contained in:
onestraw 2019-01-21 13:24:43 +08:00
parent fa19ffd050
commit 9d22b67f74

View file

@ -202,7 +202,9 @@ func New(provided Config) (storage.PeerStore, error) {
case <-time.After(cfg.GarbageCollectionInterval): case <-time.After(cfg.GarbageCollectionInterval):
before := time.Now().Add(-cfg.PeerLifetime) before := time.Now().Add(-cfg.PeerLifetime)
log.Debug("storage: purging peers with no announces since", log.Fields{"before": before}) log.Debug("storage: purging peers with no announces since", log.Fields{"before": before})
ps.collectGarbage(before) if err = ps.collectGarbage(before); err != nil {
log.Error("storage: collectGarbage error", log.Fields{"before": before, "error": err})
}
} }
} }
}() }()
@ -298,29 +300,29 @@ func (ps *peerStore) populateProm() {
defer conn.Close() defer conn.Close()
for _, group := range ps.groups() { for _, group := range ps.groups() {
if n, err := conn.Do("GET", ps.infohashCountKey(group)); err != nil { if n, err := redis.Int64(conn.Do("GET", ps.infohashCountKey(group))); err != nil && err != redis.ErrNil {
log.Error("storage: GET counter failure", log.Fields{ log.Error("storage: GET counter failure", log.Fields{
"key": ps.infohashCountKey(group), "key": ps.infohashCountKey(group),
"error": err, "error": err,
}) })
} else { } else {
numInfohashes += n.(int64) numInfohashes += n
} }
if n, err := conn.Do("GET", ps.seederCountKey(group)); err != nil { if n, err := redis.Int64(conn.Do("GET", ps.seederCountKey(group))); err != nil && err != redis.ErrNil {
log.Error("storage: GET counter failure", log.Fields{ log.Error("storage: GET counter failure", log.Fields{
"key": ps.seederCountKey(group), "key": ps.seederCountKey(group),
"error": err, "error": err,
}) })
} else { } else {
numSeeders += n.(int64) numSeeders += n
} }
if n, err := conn.Do("GET", ps.leecherCountKey(group)); err != nil { if n, err := redis.Int64(conn.Do("GET", ps.leecherCountKey(group))); err != nil && err != redis.ErrNil {
log.Error("storage: GET counter failure", log.Fields{ log.Error("storage: GET counter failure", log.Fields{
"key": ps.leecherCountKey(group), "key": ps.leecherCountKey(group),
"error": err, "error": err,
}) })
} else { } else {
numLeechers += n.(int64) numLeechers += n
} }
} }
@ -357,20 +359,20 @@ func (ps *peerStore) PutSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) error
conn.Send("MULTI") conn.Send("MULTI")
conn.Send("HSET", encodedSeederInfoHash, pk, ct) conn.Send("HSET", encodedSeederInfoHash, pk, ct)
conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct) conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct)
reply, err := redis.Values(conn.Do("EXEC")) reply, err := redis.Int64s(conn.Do("EXEC"))
if err != nil { if err != nil {
return err return err
} }
// pk is a new field. // pk is a new field.
if reply[0].(int64) == 1 { if reply[0] == 1 {
_, err = conn.Do("INCR", ps.seederCountKey(addressFamily)) _, err = conn.Do("INCR", ps.seederCountKey(addressFamily))
if err != nil { if err != nil {
return err return err
} }
} }
// encodedSeederInfoHash is a new field. // encodedSeederInfoHash is a new field.
if reply[1].(int64) == 1 { if reply[1] == 1 {
_, err = conn.Do("INCR", ps.infohashCountKey(addressFamily)) _, err = conn.Do("INCR", ps.infohashCountKey(addressFamily))
if err != nil { if err != nil {
return err return err
@ -400,11 +402,11 @@ func (ps *peerStore) DeleteSeeder(ih bittorrent.InfoHash, p bittorrent.Peer) err
encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String()) encodedSeederInfoHash := ps.seederInfohashKey(addressFamily, ih.String())
delNum, err := conn.Do("HDEL", encodedSeederInfoHash, pk) delNum, err := redis.Int64(conn.Do("HDEL", encodedSeederInfoHash, pk))
if err != nil { if err != nil {
return err return err
} }
if delNum.(int64) == 0 { if delNum == 0 {
return storage.ErrResourceDoesNotExist return storage.ErrResourceDoesNotExist
} }
if _, err := conn.Do("DECR", ps.seederCountKey(addressFamily)); err != nil { if _, err := conn.Do("DECR", ps.seederCountKey(addressFamily)); err != nil {
@ -438,12 +440,12 @@ func (ps *peerStore) PutLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) error
conn.Send("MULTI") conn.Send("MULTI")
conn.Send("HSET", encodedLeecherInfoHash, pk, ct) conn.Send("HSET", encodedLeecherInfoHash, pk, ct)
conn.Send("HSET", addressFamily, encodedLeecherInfoHash, ct) conn.Send("HSET", addressFamily, encodedLeecherInfoHash, ct)
reply, err := redis.Values(conn.Do("EXEC")) reply, err := redis.Int64s(conn.Do("EXEC"))
if err != nil { if err != nil {
return err return err
} }
// pk is a new field. // pk is a new field.
if reply[0].(int64) == 1 { if reply[0] == 1 {
_, err = conn.Do("INCR", ps.leecherCountKey(addressFamily)) _, err = conn.Do("INCR", ps.leecherCountKey(addressFamily))
if err != nil { if err != nil {
return err return err
@ -471,11 +473,11 @@ func (ps *peerStore) DeleteLeecher(ih bittorrent.InfoHash, p bittorrent.Peer) er
pk := newPeerKey(p) pk := newPeerKey(p)
encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String()) encodedLeecherInfoHash := ps.leecherInfohashKey(addressFamily, ih.String())
delNum, err := conn.Do("HDEL", encodedLeecherInfoHash, pk) delNum, err := redis.Int64(conn.Do("HDEL", encodedLeecherInfoHash, pk))
if err != nil { if err != nil {
return err return err
} }
if delNum.(int64) == 0 { if delNum == 0 {
return storage.ErrResourceDoesNotExist return storage.ErrResourceDoesNotExist
} }
if _, err := conn.Do("DECR", ps.leecherCountKey(addressFamily)); err != nil { if _, err := conn.Do("DECR", ps.leecherCountKey(addressFamily)); err != nil {
@ -511,23 +513,23 @@ func (ps *peerStore) GraduateLeecher(ih bittorrent.InfoHash, p bittorrent.Peer)
conn.Send("HDEL", encodedLeecherInfoHash, pk) conn.Send("HDEL", encodedLeecherInfoHash, pk)
conn.Send("HSET", encodedSeederInfoHash, pk, ct) conn.Send("HSET", encodedSeederInfoHash, pk, ct)
conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct) conn.Send("HSET", addressFamily, encodedSeederInfoHash, ct)
reply, err := redis.Values(conn.Do("EXEC")) reply, err := redis.Int64s(conn.Do("EXEC"))
if err != nil { if err != nil {
return err return err
} }
if reply[0].(int64) == 1 { if reply[0] == 1 {
_, err = conn.Do("DECR", ps.leecherCountKey(addressFamily)) _, err = conn.Do("DECR", ps.leecherCountKey(addressFamily))
if err != nil { if err != nil {
return err return err
} }
} }
if reply[1].(int64) == 1 { if reply[1] == 1 {
_, err = conn.Do("INCR", ps.seederCountKey(addressFamily)) _, err = conn.Do("INCR", ps.seederCountKey(addressFamily))
if err != nil { if err != nil {
return err return err
} }
} }
if reply[2].(int64) == 1 { if reply[2] == 1 {
_, err = conn.Do("INCR", ps.infohashCountKey(addressFamily)) _, err = conn.Do("INCR", ps.infohashCountKey(addressFamily))
if err != nil { if err != nil {
return err return err
@ -633,7 +635,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa
conn := ps.rb.open() conn := ps.rb.open()
defer conn.Close() defer conn.Close()
leechersLen, err := conn.Do("HLEN", encodedLeecherInfoHash) leechersLen, err := redis.Int64(conn.Do("HLEN", encodedLeecherInfoHash))
if err != nil { if err != nil {
log.Error("storage: Redis HLEN failure", log.Fields{ log.Error("storage: Redis HLEN failure", log.Fields{
"Hkey": encodedLeecherInfoHash, "Hkey": encodedLeecherInfoHash,
@ -642,7 +644,7 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa
return return
} }
seedersLen, err := conn.Do("HLEN", encodedSeederInfoHash) seedersLen, err := redis.Int64(conn.Do("HLEN", encodedSeederInfoHash))
if err != nil { if err != nil {
log.Error("storage: Redis HLEN failure", log.Fields{ log.Error("storage: Redis HLEN failure", log.Fields{
"Hkey": encodedSeederInfoHash, "Hkey": encodedSeederInfoHash,
@ -651,8 +653,8 @@ func (ps *peerStore) ScrapeSwarm(ih bittorrent.InfoHash, af bittorrent.AddressFa
return return
} }
resp.Incomplete = uint32(leechersLen.(int64)) resp.Incomplete = uint32(leechersLen)
resp.Complete = uint32(seedersLen.(int64)) resp.Complete = uint32(seedersLen)
return return
} }
@ -717,45 +719,41 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
for _, group := range ps.groups() { for _, group := range ps.groups() {
// list all infohashes in the group // list all infohashes in the group
infohashesList, err := conn.Do("HKEYS", group) infohashesList, err := redis.Strings(conn.Do("HKEYS", group))
if err != nil { if err != nil {
return err return err
} }
infohashes := infohashesList.([]interface{})
for _, ih := range infohashes { for _, ihStr := range infohashesList {
ihStr := string(ih.([]byte))
isSeeder := len(ihStr) > 5 && ihStr[5:6] == "S" isSeeder := len(ihStr) > 5 && ihStr[5:6] == "S"
// list all (peer, timeout) pairs for the ih // list all (peer, timeout) pairs for the ih
ihList, err := conn.Do("HGETALL", ihStr) ihList, err := redis.Strings(conn.Do("HGETALL", ihStr))
if err != nil { if err != nil {
return err return err
} }
conIhList := ihList.([]interface{})
var pk serializedPeer var pk serializedPeer
var removedPeerCount int64 var removedPeerCount int64
for index, ihField := range conIhList { for index, ihField := range ihList {
if index%2 == 1 { // value if index%2 == 1 { // value
mtime, err := strconv.ParseInt(string(ihField.([]byte)), 10, 64) mtime, err := strconv.ParseInt(ihField, 10, 64)
if err != nil { if err != nil {
return err return err
} }
if mtime <= cutoffUnix { if mtime <= cutoffUnix {
log.Debug("storage: deleting peer", log.Fields{
"Peer": decodePeerKey(pk).String(),
})
ret, err := redis.Int64(conn.Do("HDEL", ihStr, pk)) ret, err := redis.Int64(conn.Do("HDEL", ihStr, pk))
if err != nil { if err != nil {
return err return err
} }
removedPeerCount += ret removedPeerCount += ret
log.Debug("storage: deleting peer", log.Fields{
"Peer": decodePeerKey(pk).String(),
})
} }
} else { // key } else { // key
pk = serializedPeer(ihField.([]byte)) pk = serializedPeer([]byte(ihField))
} }
} }
// DECR seeder/leecher counter // DECR seeder/leecher counter
@ -763,9 +761,11 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
if isSeeder { if isSeeder {
decrCounter = ps.seederCountKey(group) decrCounter = ps.seederCountKey(group)
} }
if removedPeerCount > 0 {
if _, err := conn.Do("DECRBY", decrCounter, removedPeerCount); err != nil { if _, err := conn.Do("DECRBY", decrCounter, removedPeerCount); err != nil {
return err return err
} }
}
// use WATCH to avoid race condition // use WATCH to avoid race condition
// https://redis.io/topics/transactions // https://redis.io/topics/transactions
@ -773,11 +773,11 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
if err != nil { if err != nil {
return err return err
} }
ihLen, err := conn.Do("HLEN", ihStr) ihLen, err := redis.Int64(conn.Do("HLEN", ihStr))
if err != nil { if err != nil {
return err return err
} }
if ihLen.(int64) == 0 { if ihLen == 0 {
// Empty hashes are not shown among existing keys, // Empty hashes are not shown among existing keys,
// in other words, it's removed automatically after `HDEL` the last field. // in other words, it's removed automatically after `HDEL` the last field.
//_, err := conn.Do("DEL", ihStr) //_, err := conn.Do("DEL", ihStr)
@ -788,15 +788,17 @@ func (ps *peerStore) collectGarbage(cutoff time.Time) error {
conn.Send("DECR", ps.infohashCountKey(group)) conn.Send("DECR", ps.infohashCountKey(group))
} }
_, err = redis.Values(conn.Do("EXEC")) _, err = redis.Values(conn.Do("EXEC"))
if err != nil { if err != nil && err != redis.ErrNil {
log.Error("storage: Redis EXEC failure, maybe caused by WATCH, ignored", log.Fields{ log.Error("storage: Redis EXEC failure", log.Fields{
"group": group, "group": group,
"infohash": ihStr, "infohash": ihStr,
"error": err, "error": err,
}) })
} }
} else { } else {
conn.Do("UNWATCH", ihStr) if _, err = conn.Do("UNWATCH"); err != nil && err != redis.ErrNil {
log.Error("storage: Redis UNWATCH failure", log.Fields{"error": err})
}
} }
} }
} }