diff --git a/cmd/dht.go b/cmd/dht.go index 04d329e..d9064f5 100644 --- a/cmd/dht.go +++ b/cmd/dht.go @@ -62,7 +62,11 @@ func dhtCmd(cmd *cobra.Command, args []string) { } d := dht.New(dhtConf) - d.Start() + err := d.Start() + if err != nil { + log.Println("error starting dht: " + err.Error()) + return + } interruptChan := make(chan os.Signal, 1) signal.Notify(interruptChan, os.Interrupt, syscall.SIGTERM, syscall.SIGINT) diff --git a/cmd/root.go b/cmd/root.go index 35d410c..38ab622 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -116,9 +116,5 @@ func loadConfig(path string) (Config, error) { } err = json.Unmarshal(raw, &c) - if err != nil { - return c, err - } - - return c, nil + return c, err } diff --git a/db/db.go b/db/db.go index f93cdcd..3224249 100644 --- a/db/db.go +++ b/db/db.go @@ -296,6 +296,7 @@ func (s *SQL) GetStoredHashesInRange(ctx context.Context, start, end bits.Bitmap } var hash string + ScanLoop: for rows.Next() { err := rows.Scan(&hash) if err != nil { @@ -304,7 +305,7 @@ func (s *SQL) GetStoredHashesInRange(ctx context.Context, start, end bits.Bitmap } select { case <-ctx.Done(): - break + break ScanLoop case ch <- bits.FromHexP(hash): } } diff --git a/dht/contact.go b/dht/contact.go index 027b258..724e51f 100644 --- a/dht/contact.go +++ b/dht/contact.go @@ -108,12 +108,7 @@ func (c *Contact) UnmarshalBencode(b []byte) error { return errors.Err("invalid IP") } - err = bencode.DecodeBytes(raw[2], &c.Port) - if err != nil { - return err - } - - return nil + return bencode.DecodeBytes(raw[2], &c.Port) } func sortByDistance(contacts []Contact, target bits.Bitmap) { diff --git a/dht/dht.go b/dht/dht.go index b5b2965..9681730 100644 --- a/dht/dht.go +++ b/dht/dht.go @@ -70,11 +70,7 @@ func (dht *DHT) connect(conn UDPConn) error { dht.node = NewNode(contact.ID) dht.tokenCache = newTokenCache(dht.node, tokenSecretRotationInterval) - err = dht.node.Connect(conn) - if err != nil { - return err - } - return nil + return dht.node.Connect(conn) } // Start starts the dht diff --git a/dht/dht_announce.go b/dht/dht_announce.go index cccd2fb..19ecc75 100644 --- a/dht/dht_announce.go +++ b/dht/dht_announce.go @@ -57,7 +57,11 @@ func (dht *DHT) runAnnouncer() { defer dht.grp.Done() limiter := rate.NewLimiter(rate.Limit(dht.conf.AnnounceRate), dht.conf.AnnounceRate) for { - limiter.Wait(context.Background()) // TODO: should use grp.ctx somehow? so when grp is closed, wait returns + err := limiter.Wait(context.Background()) // TODO: should use grp.ctx somehow? so when grp is closed, wait returns + if err != nil { + log.Error(errors.Prefix("rate limiter", err)) + continue + } select { case limitCh <- time.Now(): case <-dht.grp.Ch(): diff --git a/dht/node_finder.go b/dht/node_finder.go index 60b2098..50cb14d 100644 --- a/dht/node_finder.go +++ b/dht/node_finder.go @@ -250,10 +250,6 @@ func (cf *contactFinder) probe(cycleID string) *Contact { return cf.closest(res.Contacts...) } -func (cf *contactFinder) probeClosestOutstanding() { - -} - // appendNewToShortlist appends any new contacts to the shortlist and sorts it by distance // contacts that have already been added to the shortlist in the past are ignored func (cf *contactFinder) appendNewToShortlist(contacts []Contact) { @@ -320,11 +316,7 @@ func (cf *contactFinder) isSearchFinished() bool { cf.activeContactsMutex.Lock() defer cf.activeContactsMutex.Unlock() - if len(cf.activeContacts) >= bucketSize { - return true - } - - return false + return len(cf.activeContacts) >= bucketSize } func (cf *contactFinder) debug(format string, args ...interface{}) { diff --git a/dht/routing_table.go b/dht/routing_table.go index 30fc906..5449f18 100644 --- a/dht/routing_table.go +++ b/dht/routing_table.go @@ -263,14 +263,20 @@ func (rt *routingTable) Update(c Contact) { } } - b.UpdatePeer(peer{Contact: c, Distance: rt.id.Xor(c.ID)}, true) + err := b.UpdatePeer(peer{Contact: c, Distance: rt.id.Xor(c.ID)}, true) + if err != nil { + log.Error(err) + } } // Fresh refreshes a contact if its already in the routing table func (rt *routingTable) Fresh(c Contact) { rt.mu.RLock() defer rt.mu.RUnlock() - rt.bucketFor(c.ID).UpdatePeer(peer{Contact: c, Distance: rt.id.Xor(c.ID)}, false) + err := rt.bucketFor(c.ID).UpdatePeer(peer{Contact: c, Distance: rt.id.Xor(c.ID)}, false) + if err != nil { + log.Error(err) + } } // FailContact marks a contact as having failed, and removes it if it failed too many times diff --git a/dht/rpc.go b/dht/rpc.go index 2857df8..cc27fe3 100644 --- a/dht/rpc.go +++ b/dht/rpc.go @@ -110,6 +110,9 @@ func (rpc *rpcReceiver) IterativeFindValue(r *http.Request, args *RpcIterativeFi return err } foundContacts, found, err := FindContacts(rpc.dht.node, key, false, nil) + if err != nil { + return err + } result.Contacts = foundContacts result.FoundValue = found return nil @@ -153,7 +156,11 @@ func (dht *DHT) runRPCServer(port int) { s := rpc2.NewServer() s.RegisterCodec(json.NewCodec(), "application/json") s.RegisterCodec(json.NewCodec(), "application/json;charset=UTF-8") - s.RegisterService(&rpcReceiver{dht: dht}, "rpc") + err := s.RegisterService(&rpcReceiver{dht: dht}, "rpc") + if err != nil { + log.Error(errors.Prefix("registering rpc service", err)) + return + } handler := mux.NewRouter() handler.Handle("/", s) @@ -171,6 +178,10 @@ func (dht *DHT) runRPCServer(port int) { }() <-dht.grp.Ch() - server.Shutdown(context.Background()) + err = server.Shutdown(context.Background()) + if err != nil { + log.Error(errors.Prefix("shutting down rpc service", err)) + return + } wg.Wait() } diff --git a/peer/server.go b/peer/server.go index 0aecf49..9cb69f8 100644 --- a/peer/server.go +++ b/peer/server.go @@ -105,9 +105,8 @@ func (s *Server) handleConnection(conn net.Conn) { for { var request []byte var response []byte - var err error - err = conn.SetReadDeadline(time.Now().Add(timeoutDuration)) + err := conn.SetReadDeadline(time.Now().Add(timeoutDuration)) if err != nil { log.Error(errors.FullTrace(err)) } diff --git a/prism/prism.go b/prism/prism.go index 190fbee..cdb2d5c 100644 --- a/prism/prism.go +++ b/prism/prism.go @@ -122,12 +122,7 @@ func (p *Prism) Start() error { return err } - err = p.cluster.Connect() - if err != nil { - return err - } - - return nil + return p.cluster.Connect() } // Shutdown gracefully shuts down the different prism components before exiting. diff --git a/reflector/server.go b/reflector/server.go index 5f0f2e9..cd3a38a 100644 --- a/reflector/server.go +++ b/reflector/server.go @@ -206,11 +206,7 @@ func (s *Server) doHandshake(conn net.Conn) error { } _, err = conn.Write(resp) - if err != nil { - return err - } - - return nil + return err } func (s *Server) readBlobRequest(conn net.Conn) (int, string, bool, error) { @@ -257,10 +253,7 @@ func (s *Server) sendBlobResponse(conn net.Conn, blobExists, isSdBlob bool) erro } _, err = conn.Write(response) - if err != nil { - return err - } - return nil + return err } func (s *Server) sendTransferResponse(conn net.Conn, receivedBlob, isSdBlob bool) error { @@ -278,8 +271,5 @@ func (s *Server) sendTransferResponse(conn net.Conn, receivedBlob, isSdBlob bool } _, err = conn.Write(response) - if err != nil { - return err - } - return nil + return err }