From 50292c7de7f42b491ca7940085bdc96e9a02e16f Mon Sep 17 00:00:00 2001 From: Justin Li Date: Fri, 20 Feb 2015 02:18:44 -0500 Subject: [PATCH] udp: Implement announces --- udp/protocol.go | 94 ++++++++++++++++++++++++++++++++++++++++++++++--- udp/writer.go | 49 ++++++++++++++++++++++++++ 2 files changed, 139 insertions(+), 4 deletions(-) create mode 100644 udp/writer.go diff --git a/udp/protocol.go b/udp/protocol.go index 99d57cc..cef4df9 100644 --- a/udp/protocol.go +++ b/udp/protocol.go @@ -7,12 +7,40 @@ package udp import ( "bytes" "encoding/binary" + "errors" "net" + + "github.com/chihaya/chihaya/stats" + "github.com/chihaya/chihaya/tracker/models" ) var initialConnectionID = []byte{0x04, 0x17, 0x27, 0x10, 0x19, 0x80} -func (srv *Server) handlePacket(packet []byte, addr *net.UDPAddr) (response []byte) { +var eventIDs = []string{"none", "completed", "started", "stopped"} + +var ( + errMalformedPacket = errors.New("malformed packet") + errMalformedIP = errors.New("malformed IP address") + errMalformedEvent = errors.New("malformed event ID") +) + +func writeHeader(response []byte, action uint32, transactionID []byte) { + binary.BigEndian.PutUint32(response, action) + copy(response[4:], transactionID) +} + +func handleTorrentError(err error, w *Writer) { + if err == nil { + return + } + + if _, ok := err.(models.ClientError); ok { + w.WriteError(err) + stats.RecordEvent(stats.ClientError) + } +} + +func (s *Server) handlePacket(packet []byte, addr *net.UDPAddr) (response []byte) { if len(packet) < 16 { return nil // Malformed, no client packets are less than 16 bytes. } @@ -22,6 +50,8 @@ func (srv *Server) handlePacket(packet []byte, addr *net.UDPAddr) (response []by transactionID := packet[12:16] generatedConnID := GenerateConnectionID(addr.IP) + writer := &Writer{transactionID: transactionID} + switch action { case 0: // Connect request. @@ -35,11 +65,67 @@ func (srv *Server) handlePacket(packet []byte, addr *net.UDPAddr) (response []by case 1: // Announce request. + writer.buf = new(bytes.Buffer) + ann, err := s.newAnnounce(packet, addr.IP) + + if err == nil { + err = s.tracker.HandleAnnounce(ann, writer) + } + + handleTorrentError(err, writer) + + case 2: + // Scrape request. + writer.buf = new(bytes.Buffer) + // handleTorrentError(s.tracker.HandleScrape(scrape, writer), writer) + } + + if writer.buf != nil { + response = writer.buf.Bytes() } return } -func writeHeader(response []byte, action uint32, transactionID []byte) { - binary.BigEndian.PutUint32(response, action) - copy(response[4:], transactionID) +func (s *Server) newAnnounce(packet []byte, ip net.IP) (*models.Announce, error) { + if len(packet) < 98 { + return nil, errMalformedPacket + } + + infohash := packet[16:36] + peerID := packet[36:56] + + downloaded := binary.BigEndian.Uint64(packet[56:64]) + left := binary.BigEndian.Uint64(packet[64:72]) + uploaded := binary.BigEndian.Uint64(packet[72:80]) + + eventID := packet[83] + if eventID > 3 { + return nil, errMalformedEvent + } + + ipbuf := packet[84:88] + if !bytes.Equal(ipbuf, []byte{0, 0, 0, 0}) { + ip = net.ParseIP(string(ipbuf)) + } + if ip == nil { + return nil, errMalformedIP + } + + // TODO(pushrax): what exactly is the key "key" used for? + + numWant := binary.BigEndian.Uint32(packet[92:96]) + port := binary.BigEndian.Uint16(packet[96:98]) + + return &models.Announce{ + Config: s.config, + Downloaded: downloaded, + Event: eventIDs[eventID], + IPv4: ip, + Infohash: string(infohash), + Left: left, + NumWant: int(numWant), + PeerID: string(peerID), + Port: port, + Uploaded: uploaded, + }, nil } diff --git a/udp/writer.go b/udp/writer.go new file mode 100644 index 0000000..1b6f6b1 --- /dev/null +++ b/udp/writer.go @@ -0,0 +1,49 @@ +// Copyright 2015 The Chihaya Authors. All rights reserved. +// Use of this source code is governed by the BSD 2-Clause license, +// which can be found in the LICENSE file. + +package udp + +import ( + "bytes" + "encoding/binary" + + "github.com/chihaya/chihaya/tracker/models" +) + +type Writer struct { + buf *bytes.Buffer + + transactionID []byte +} + +func (w *Writer) WriteError(err error) error { + w.writeHeader(3) + w.buf.WriteString(err.Error()) + w.buf.WriteRune('\000') + return nil +} + +func (w *Writer) WriteAnnounce(res *models.AnnounceResponse) error { + w.writeHeader(1) + binary.Write(w.buf, binary.BigEndian, uint32(res.Interval)) + binary.Write(w.buf, binary.BigEndian, uint32(res.Incomplete)) + binary.Write(w.buf, binary.BigEndian, uint32(res.Complete)) + + for _, peer := range res.IPv4Peers { + w.buf.Write(peer.IP) + binary.Write(w.buf, binary.BigEndian, peer.Port) + } + + return nil +} + +func (w *Writer) WriteScrape(res *models.ScrapeResponse) error { + w.writeHeader(2) + return nil +} + +func (w *Writer) writeHeader(action uint32) { + binary.Write(w.buf, binary.BigEndian, action) + w.buf.Write(w.transactionID) +}