Go Torrent File Download Exceeding Time Limit

I’m stuck on Stage #JV8.

I’ve tried using using a connection pool for each download worker and using pointers to data in channels but the download still seems to be very slow.

Here is the main download function:

func DownloadAll(meta *torrent.Metainfo, peerID [20]byte) ([]byte, error) {
	info := &meta.Info

	if info.PieceLength <= 0 {
		return nil, fmt.Errorf("invalid piece length %d", info.PieceLength)
	}

	// Compute number of pieces (ceiling division).
	numPieces := int((info.Length + info.PieceLength - 1) / info.PieceLength)
	if numPieces <= 0 {
		return nil, fmt.Errorf("no pieces to download")
	}

	// Allocate full file buffer in memory
	full := make([]byte, info.Length)

	// Get a list of peers from the tracker.
	tr, err := tracker.Announce(meta, peerID)
	if err != nil {
		return nil, fmt.Errorf("tracker announce failed: %w", err)
	}
	if len(tr.Peers) == 0 {
		return nil, fmt.Errorf("tracker returned no peers")
	}

	workerCount := maxWorkers
	if len(tr.Peers) < workerCount {
		workerCount = len(tr.Peers)
	}
	if workerCount == 0 {
		return nil, fmt.Errorf("no peers available")
	}

	jobs := make(chan *pieceJob)
	results := make(chan *pieceResult)

	var wg sync.WaitGroup
	wg.Add(workerCount)

	peers := tr.Peers

	// Start workers
	for i := 0; i < workerCount; i++ {
		go func() {
			defer wg.Done()
			workerLoop(meta, peerID, peers, jobs, results)
		}()
	}

	// Close results when all workers are done
	go func() {
		wg.Wait()
		close(results)
	}()

	// Seed initial jobs for all pieces
	go func() {
		for i := 0; i < numPieces; i++ {
			jobs <- &pieceJob{Index: i, Attempt: 0}
		}
		// Don't close jobs here, we way enqueue retries
	}()

	completed := 0
	completedPieces := make([]bool, numPieces)

	for res := range results {
		idx := res.Index

		if res.Err == nil {
			// Successful piece download
			if !completedPieces[idx] {
				// Copy piece into final buffer at correct offset
				offset := int64(idx) * info.PieceLength
				copy(full[offset:offset+int64(len(res.Data))], res.Data)

				completedPieces[idx] = true
				completed++
			}

			if completed == numPieces {
				// All pieces done, no more jobs needed
				close(jobs)
				break
			}
			continue
		}

		// An error occured, retry
		if completedPieces[idx] {
			// Already have this piece from another attempt, ignore error
			continue
		}

		// Decide whether to retry based on error type
		if errors.Is(res.Err, peer.ErrPeerDoesNotHavePiece) {
			// This wasn't a "real" attempt in some sense; it just means
			// this peer can't help. We *definitely* want to try others.
			// We can still increment Attempt so the next try picks the next peer.
		}

		if res.Attempt+1 >= maxAttemptsPerPiece {
			// Give up on this piece
			close(jobs)
			return nil, fmt.Errorf("piece %d failed after %d attempts: %w", idx, res.Attempt+1, res.Err)
		}

		// Retry the piece with increased attempt count
		jobs <- &pieceJob{
			Index:   idx,
			Attempt: res.Attempt + 1,
		}
	}

	if completed != numPieces {
		return nil, fmt.Errorf("download incomplete: got %d/%d pieces", completed, numPieces)
	}

	return full, nil
}

And here’s the actual worker loop:

func workerLoop(
	meta *torrent.Metainfo,
	peerID [20]byte,
	peers []tracker.Peer,
	jobs <-chan *pieceJob,
	results chan<- *pieceResult,
) {
	if len(peers) == 0 {
		return
	}

	clients := make([]*peer.Client, len(peers))

	for job := range jobs {
		peerIdx := job.Attempt % len(peers)
		c := clients[peerIdx]

		// lazy-init session
		if c == nil {
			var err error
			c, err = peer.NewClient(meta, peerID, peers[peerIdx])
			if err != nil {
				results <- &pieceResult{
					Index:   job.Index,
					Attempt: job.Attempt,
					Err:     err,
				}
				continue
			}
			clients[peerIdx] = c
		}

		data, err := c.DownloadPiece(&meta.Info, job.Index)
		if err != nil {
			// Close and drop the session so we reconnect.
			_ = c.Conn.Close()
			clients[peerIdx] = nil

			results <- &pieceResult{
				Index:   job.Index,
				Attempt: job.Attempt,
				Err:     err,
			}
			continue
		}

		results <- &pieceResult{
			Index:   job.Index,
			Attempt: job.Attempt,
			Data:    data,
			Err:     nil,
		}
	}

	// Cleanup
	for _, c := range clients {
		if c != nil {
			_ = c.Conn.Close()
		}
	}
}

Finally, here is the function that downloads a piece with the given index:

func (c *Client) DownloadPiece(info *torrent.Info, pieceIndex int) ([]byte, error) {
	// Check if the peer has the piece with pieceIndex
	if !peerHasPiece(c.Bitfield, pieceIndex) {
		return nil, ErrPeerDoesNotHavePiece
	}

	// Calculate the exact length of this piece
	pieceLen, err := pieceLength(info, pieceIndex)
	if err != nil {
		return nil, err
	}
	buf := make([]byte, pieceLen)

	// Determine the expected SHA-1 hash for this piece
	expHash, err := expectedPieceHash(info, pieceIndex)
	if err != nil {
		return nil, err
	}

	// Prepare the list of blocks for this piece
	blocks := buildBlocks(pieceLen)

	// Pipelined request/response loop
	var (
		nextToRequest    = 0 // index into blocks
		blocksInProgress = 0 // number of outstanding requests
		completedBlocks  = 0
		totalBlocks      = len(blocks)
	)

	for completedBlocks < totalBlocks {
		// Send requests while we have capacity and blocks left
		for blocksInProgress < pipelineDepth && nextToRequest < totalBlocks {
			blk := &blocks[nextToRequest]
			if err := sendRequest(c.Conn, pieceIndex, blk.Begin, blk.Len); err != nil {
				return nil, fmt.Errorf(
					"failed to send request for block begin=%d len=%d: %w",
					blk.Begin, blk.Len, err,
				)
			}
			blocksInProgress++
			nextToRequest++
		}

		// Read messages until we handle a 'piece' block we care about
		msg, err := readMessage(c.Conn)
		if err != nil {
			return nil, fmt.Errorf("failed to read message while downloading piece: %w", err)
		}

		switch msg.ID {
		case msgPiece:
			// Parse piece message: index (4 bytes), begin (4 bytes), block (rest)
			if len(msg.Payload) < 8 {
				return nil, fmt.Errorf("piece message payload too short: %d", len(msg.Payload))
			}
			index := int(binary.BigEndian.Uint32(msg.Payload[0:4]))
			begin := int(binary.BigEndian.Uint32(msg.Payload[4:8]))
			blockData := msg.Payload[8:]

			// Only process blocks for the piece we requested
			if index != pieceIndex {
				// Ignore mismatched pieces
				continue
			}

			blk := findBlock(blocks, begin, len(blockData))
			if blk == nil {
				// Could be a duplicate or something we didn't request so ignore it for now
				continue
			}
			if blk.Done {
				// Already have it so ignore duplicates
				continue
			}

			// Copy data into the correct offset of the piece buffer
			copy(buf[begin:begin+len(blockData)], blockData)
			blk.Done = true
			completedBlocks++
			blocksInProgress--

		case msgChoke:
			// TODO: Handle re-requests. Throw an error for now
			return nil, fmt.Errorf("peer choked while downloading piece %d", pieceIndex)

		default:
			// Ignore other messages for now
			continue
		}
	}

	// Verify piece hash
	sum := sha1.Sum(buf)
	if !bytes.Equal(sum[:], expHash[:]) {
		return nil, fmt.Errorf(
			"piece hash mismatch at index %d: expected %s, got %s",
			pieceIndex,
			hex.EncodeToString(expHash[:]),
			hex.EncodeToString(sum[:]),
		)
	}

	return buf, nil
}

I realize that this is a lot to go through but I just like being verbose for readability purposes.

Thank you in advance.

Edit: I added logs at the start and end of DownloadPiece function. I also logged the entry and exit of workers and total time the program took to finish. I should also mention I ran the code locally to get this output.

2025/11/21 18:45:03 worker started for peer 165.232.35.114:51443
2025/11/21 18:45:03 [peer 165.232.35.114:51443] downloading piece 1
2025/11/21 18:45:04 worker started for peer 165.232.41.73:51544
2025/11/21 18:45:04 [peer 165.232.41.73:51544] downloading piece 2
2025/11/21 18:45:04 [peer 165.232.35.114:51443] finished piece 1 in 129.510133ms
2025/11/21 18:45:04 worker EXITED for peer 165.232.35.114:51443 in 129.598804ms
2025/11/21 18:45:04 worker started for peer 165.232.38.164:51433
2025/11/21 18:45:04 [peer 165.232.38.164:51433] downloading piece 0
2025/11/21 18:45:04 [peer 165.232.41.73:51544] finished piece 2 in 126.009559ms
2025/11/21 18:45:04 worker EXITED for peer 165.232.41.73:51544 in 126.096717ms
2025/11/21 18:45:04 [peer 165.232.38.164:51433] finished piece 0 in 122.422948ms
2025/11/21 18:45:04 worker EXITED for peer 165.232.38.164:51433 in 122.518623ms
File downloaded successfully at the path /tmp/test.txt
Program took 935.36994ms to finish

The remote execution prints out the following logs:

[compile] Moved ./.codecrafters/run.sh → ./your_program.sh
[compile] Compilation successful.
[tester::#JV8] Running tests for Stage #JV8 (Download the whole file)
[tester::#JV8] Running ./your_program.sh download -o /tmp/torrents398844642/codercat.gif /tmp/torrents398844642/codercat.gif.torrent
[your_program] 2025/11/21 18:52:14 worker started for peer 165.232.38.164:51448
[your_program] 2025/11/21 18:52:14 [peer 165.232.38.164:51448] downloading piece 2
[your_program] 2025/11/21 18:52:14 [peer 165.232.38.164:51448] finished piece 2 in 481.471608ms
[tester::#JV8] timed out, test exceeded 20 seconds
[tester::#JV8] Test failed

Is there something wrong on the server side or am I missing something here? How can I exceed 20 seconds of runtime when the entire code takes about a second on my machine?

Hey @revtheundead, I added a few logs like this:

Looks like most of the workers ran into errors:

I wonder if it’s because all the workers got the same peerIndex (0)? :thinking:

1 Like

Hey @andy1li , reading the error message after the tests, I didn’t think there were any issues there. The torrent I had on disk worked just fine so I’ll be sure to look into this and get back to you as soon as I can. Thank you for taking the time to debug this :slight_smile:

1 Like

This was exactly the issue. Apparently I didn’t read the spec thoroughly and I didn’t know that I could only have one connection per peer. Upon receiving duplicate connections from my client, other peers closed older connections they had with me which caused my workers to become useless, resulting in an incomplete file.

I changed the peer picking logic to choose one peer per worker and the issue was resolved:

// Select peers for workers (random subset)
peers := pickRandomPeers(tr.Peers, workerCount)

jobs := make(chan *pieceJob, numPieces)
results := make(chan *pieceResult)

var wg sync.WaitGroup
wg.Add(workerCount)

// Start one worker per peer (up to workerCount)
for _, p := range peers {
	peer := p
	go func() {
		defer wg.Done()
		workerLoop(meta, peerID, peer, jobs, results)
	}()
}

And my worker loop was changed to accept a single peer instead of trying to hold a list of connections:

func workerLoop(
	meta *torrent.Metainfo,
	peerID [20]byte,
	p tracker.Peer,
	jobs <-chan *pieceJob,
	results chan<- *pieceResult,
) {
	// Establish a persistent connection
	c, err := peer.NewClient(meta, peerID, p)
	if err != nil {
		log.Printf("worker %s failed handshake: %v", c.Addr, err)
		return
	}
	defer c.Conn.Close()

	for job := range jobs {
		index := job.Index

		// If peer doesn’t have piece → requeue job
		if !peer.PeerHasPiece(c.Bitfield, index) {
			results <- &pieceResult{
				Index:   index,
				Attempt: job.Attempt,
				Err:     peer.ErrPeerDoesNotHavePiece,
			}
			continue
		}

		data, err := c.DownloadPiece(&meta.Info, index)
		if err != nil {
			results <- &pieceResult{
				Index:   index,
				Attempt: job.Attempt,
				Err:     err,
			}

			// Drop connection and reconnect on next iteration
			c.Conn.Close()
			c = nil
			c, _ = peer.NewClient(meta, peerID, p)
			continue
		}

		results <- &pieceResult{
			Index:   index,
			Attempt: job.Attempt,
			Data:    data,
			Err:     nil,
		}
	}
}

For the sake of clarity, that new helper is defined as such:

func pickRandomPeers(all []tracker.Peer, count int) []tracker.Peer {
	if len(all) <= count {
		return all
	}
	out := make([]tracker.Peer, len(all))
	copy(out, all)
	rand.Shuffle(len(out), func(i, j int) { out[i], out[j] = out[j], out[i] })
	return out[:count]
}

The rest is pretty much the same, although at the time of writing this I made some additional changes to log download stats and improve piece picking (just for the fun of it).

I should also mention that, the reason I took the mod of attempts when picking a peer index was to avoid a problem with “rare” pieces. I thought that, theoretically there could be a piece that was present in only one of the “many” peers I potentially had. Essentially I didn’t want to give up on such pieces prematurely since I was retrying failed pieces only three times before raising an error.

I guess moral of the story is “premature optimization is bad”. Trying to solve problems before they occur isn’t always a great idea since you can overlook stuff like this.

Thank you @andy1li! You’ve been a great help, this was really bugging me.

1 Like

This topic was automatically closed 5 days after the last reply. New replies are no longer allowed.