diff --git a/CHANGELOG.md b/CHANGELOG.md index 7b2722b..60cbdd3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,11 @@ The following emojis are used to highlight certain changes: - AutoConf support: automatic configuration of bootstrap peers and delegated routing endpoints ([#123](https://github.com/ipfs/someguy/pull/123)). When enabled (default), the `auto` placeholder is replaced with network-recommended values. - All endpoint flags (`--provider-endpoints`, `--peer-endpoints`, `--ipns-endpoints`) default to `auto` - See [environment-variables.md](docs/environment-variables.md#someguy_autoconf) for configuration details +- Added `/routing/v1/peers/closest/{key}` endpoint implementing [IPIP-476](https://github.com/ipfs/specs/pull/476) + - Returns DHT-closest peers to a given CID or PeerID + - Accepts both CID and legacy PeerID formats (e.g., `12D3KooW...`) + - Uses WAN DHT only for more reliable results + - Includes cached addresses in results when available ### Changed diff --git a/client.go b/client.go index b83db7f..f7397e8 100644 --- a/client.go +++ b/client.go @@ -116,6 +116,47 @@ func findPeers(ctx context.Context, pid peer.ID, endpoint string, prettyOutput b return nil } +func getClosestPeers(ctx context.Context, key cid.Cid, endpoint string, prettyOutput bool) error { + drc, err := newDelegatedRoutingClient(endpoint) + if err != nil { + return err + } + + recordsIter, err := drc.GetClosestPeers(ctx, key) + if err != nil { + return err + } + defer recordsIter.Close() + + for recordsIter.Next() { + res := recordsIter.Val() + + // Check for error, but do not complain if we exceeded the timeout. We are + // expecting that to happen: we explicitly defined a timeout. + if res.Err != nil { + if !errors.Is(res.Err, context.DeadlineExceeded) { + return res.Err + } + + return nil + } + + if prettyOutput { + fmt.Fprintln(os.Stdout, res.Val.ID) + fmt.Fprintln(os.Stdout, "\tProtocols:", res.Val.Protocols) + fmt.Fprintln(os.Stdout, "\tAddresses:", res.Val.Addrs) + fmt.Fprintln(os.Stdout) + } else { + err := json.NewEncoder(os.Stdout).Encode(res.Val) + if err != nil { + return err + } + } + } + + return nil +} + func getIPNS(ctx context.Context, name ipns.Name, endpoint string, prettyOutput bool) error { drc, err := newDelegatedRoutingClient(endpoint) if err != nil { diff --git a/go.mod b/go.mod index 28e0504..96c0ef6 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/dustin/go-humanize v1.0.1 github.com/felixge/httpsnoop v1.0.4 github.com/hashicorp/golang-lru/v2 v2.0.7 - github.com/ipfs/boxo v0.35.2 + github.com/ipfs/boxo v0.35.3-0.20251118170232-e71f50ea2263 github.com/ipfs/go-cid v0.5.0 github.com/ipfs/go-log/v2 v2.9.0 github.com/libp2p/go-libp2p v0.45.0 diff --git a/go.sum b/go.sum index 736c7ed..4bb9f9f 100644 --- a/go.sum +++ b/go.sum @@ -259,8 +259,8 @@ github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc= github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= -github.com/ipfs/boxo v0.35.2 h1:0QZJJh6qrak28abENOi5OA8NjBnZM4p52SxeuIDqNf8= -github.com/ipfs/boxo v0.35.2/go.mod h1:bZn02OFWwJtY8dDW9XLHaki59EC5o+TGDECXEbe1w8U= +github.com/ipfs/boxo v0.35.3-0.20251118170232-e71f50ea2263 h1:7sSi4euS5Rb+RwQZOXrd/fURpC9kgbESD4DPykaLy0I= +github.com/ipfs/boxo v0.35.3-0.20251118170232-e71f50ea2263/go.mod h1:bZn02OFWwJtY8dDW9XLHaki59EC5o+TGDECXEbe1w8U= github.com/ipfs/go-block-format v0.2.3 h1:mpCuDaNXJ4wrBJLrtEaGFGXkferrw5eqVvzaHhtFKQk= github.com/ipfs/go-block-format v0.2.3/go.mod h1:WJaQmPAKhD3LspLixqlqNFxiZ3BZ3xgqxxoSR/76pnA= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= diff --git a/main.go b/main.go index 3c8f7d4..84b9fec 100644 --- a/main.go +++ b/main.go @@ -359,6 +359,22 @@ func main() { return findPeers(ctx.Context, pid, endPoint, ctx.Bool("pretty")) }, }, + { + Name: "getclosestpeers", + Usage: "getclosestpeers ", + UsageText: "Find DHT-closest peers to a key (CID or peer ID)", + Action: func(ctx *cli.Context) error { + if ctx.NArg() != 1 { + return errors.New("invalid command, see help") + } + keyStr := ctx.Args().Get(0) + c, err := parseKey(keyStr) + if err != nil { + return err + } + return getClosestPeers(ctx.Context, c, ctx.String("endpoint"), ctx.Bool("pretty")) + }, + }, { Name: "getipns", Usage: "getipns ", @@ -413,3 +429,28 @@ func printIfListConfigured(message string, list []string) { fmt.Printf(message+"%v\n", strings.Join(list, ", ")) } } + +// parseKey parses a string that can be either a CID or a PeerID. +// It accepts the following formats: +// - Arbitrary CIDs (e.g., bafkreidcd7frenco2m6ch7mny63wztgztv3q6fctaffgowkro6kljre5ei) +// - CIDv1 with libp2p-key codec (e.g., bafzaajaiaejca...) +// - Base58-encoded PeerIDs (e.g., 12D3KooW... or QmYyQ...) +// +// Returns the key as a CID. PeerIDs are converted to CIDv1 with libp2p-key codec. +func parseKey(keyStr string) (cid.Cid, error) { + // Try parsing as PeerID first using peer.Decode + // This handles legacy PeerID formats per: + // https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation + pid, pidErr := peer.Decode(keyStr) + if pidErr == nil { + return peer.ToCid(pid), nil + } + + // Fall back to parsing as CID (handles arbitrary CIDs and CIDv1 libp2p-key format) + c, cidErr := cid.Parse(keyStr) + if cidErr == nil { + return c, nil + } + + return cid.Cid{}, fmt.Errorf("unable to parse as CID or PeerID: %w", errors.Join(cidErr, pidErr)) +} diff --git a/server_cached_router.go b/server_cached_router.go index d7e4806..7edc0b7 100644 --- a/server_cached_router.go +++ b/server_cached_router.go @@ -38,11 +38,12 @@ const ( addrCacheStateHit = "hit" addrCacheStateMiss = "miss" - // source=providers|peers indicates if query originated from provider or peer endpoint - addrQueryOriginLabel = "origin" - addrQueryOriginProviders = "providers" - addrQueryOriginPeers = "peers" - addrQueryOriginUnknown = "unknown" + // source=providers|peers|closest indicates if query originated from provider, peer, or closest peers endpoint + addrQueryOriginLabel = "origin" + addrQueryOriginProviders = "providers" + addrQueryOriginPeers = "peers" + addrQueryOriginClosestPeers = "closest" + addrQueryOriginUnknown = "unknown" DispatchedFindPeersTimeout = time.Minute ) @@ -64,7 +65,7 @@ func (r cachedRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) return nil, err } - iter := NewCacheFallbackIter(it, r, ctx) + iter := NewCacheFallbackIter(it, r, ctx, addrQueryOriginProviders) return iter, nil } @@ -88,6 +89,42 @@ func (r cachedRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (it return it, nil } +func (r cachedRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + it, err := r.router.GetClosestPeers(ctx, key) + if err != nil { + return nil, err + } + + return r.applyPeerRecordCaching(it, ctx, addrQueryOriginClosestPeers), nil +} + +// applyPeerRecordCaching applies cache fallback logic to a PeerRecord iterator +// by converting to Record iterator, applying caching, and converting back +func (r cachedRouter) applyPeerRecordCaching(it iter.ResultIter[*types.PeerRecord], ctx context.Context, queryOrigin string) iter.ResultIter[*types.PeerRecord] { + // Convert *types.PeerRecord to types.Record + recordIter := iter.Map(it, func(v iter.Result[*types.PeerRecord]) iter.Result[types.Record] { + if v.Err != nil { + return iter.Result[types.Record]{Err: v.Err} + } + return iter.Result[types.Record]{Val: v.Val} + }) + + // Apply caching + cachedIter := NewCacheFallbackIter(recordIter, r, ctx, queryOrigin) + + // Convert back to *types.PeerRecord + return iter.Map(cachedIter, func(v iter.Result[types.Record]) iter.Result[*types.PeerRecord] { + if v.Err != nil { + return iter.Result[*types.PeerRecord]{Err: v.Err} + } + peerRec, ok := v.Val.(*types.PeerRecord) + if !ok { + return iter.Result[*types.PeerRecord]{Err: errors.New("unexpected record type")} + } + return iter.Result[*types.PeerRecord]{Val: peerRec} + }) +} + // withAddrsFromCache returns the best list of addrs for specified [peer.ID]. // It will consult cache ONLY if the addrs slice passed to it is empty. func (r cachedRouter) withAddrsFromCache(queryOrigin string, pid peer.ID, addrs []types.Multiaddr) []types.Multiaddr { @@ -116,6 +153,7 @@ type cacheFallbackIter struct { current iter.Result[types.Record] findPeersResult chan types.PeerRecord router cachedRouter + queryOrigin string ctx context.Context cancel context.CancelFunc ongoingLookups atomic.Int32 @@ -123,13 +161,14 @@ type cacheFallbackIter struct { // NewCacheFallbackIter is a wrapper around a results iterator that will resolve peers with no addresses from cache and if no cached addresses, will look them up via FindPeers. // It's a bit complex because it ensures we continue iterating without blocking on the FindPeers call. -func NewCacheFallbackIter(sourceIter iter.ResultIter[types.Record], router cachedRouter, ctx context.Context) *cacheFallbackIter { +func NewCacheFallbackIter(sourceIter iter.ResultIter[types.Record], router cachedRouter, ctx context.Context, queryOrigin string) *cacheFallbackIter { // Create a cancellable context for this iterator iterCtx, cancel := context.WithCancel(ctx) iter := &cacheFallbackIter{ sourceIter: sourceIter, router: router, + queryOrigin: queryOrigin, ctx: iterCtx, cancel: cancel, findPeersResult: make(chan types.PeerRecord, 100), // Buffer to avoid drops in typical cases @@ -148,7 +187,7 @@ func (it *cacheFallbackIter) Next() bool { switch val.Val.GetSchema() { case types.SchemaPeer: if record, ok := val.Val.(*types.PeerRecord); ok { - record.Addrs = it.router.withAddrsFromCache(addrQueryOriginProviders, *record.ID, record.Addrs) + record.Addrs = it.router.withAddrsFromCache(it.queryOrigin, *record.ID, record.Addrs) if len(record.Addrs) > 0 { it.current = iter.Result[types.Record]{Val: record} return true diff --git a/server_cached_router_test.go b/server_cached_router_test.go index e0ec1c6..e43b703 100644 --- a/server_cached_router_test.go +++ b/server_cached_router_test.go @@ -151,6 +151,79 @@ func TestCachedRouter(t *testing.T) { require.Equal(t, publicAddr.String(), results[0].Addrs[0].String()) }) + t.Run("GetClosestPeers with cached addresses", func(t *testing.T) { + ctx := context.Background() + c := makeCID() + pid := peer.ID("test-peer") + + // Create mock router + mr := &mockRouter{} + mockIter := newMockResultIter([]iter.Result[*types.PeerRecord]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: nil}}, + }) + mr.On("GetClosestPeers", mock.Anything, c).Return(mockIter, nil) + + // Create cached address book with test addresses + cab, err := newCachedAddrBook() + require.NoError(t, err) + + publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001") + cab.addrBook.AddAddrs(pid, []multiaddr.Multiaddr{publicAddr.Multiaddr}, time.Hour) + + // Create cached router + cr := NewCachedRouter(mr, cab) + + it, err := cr.GetClosestPeers(ctx, c) + require.NoError(t, err) + + results, err := iter.ReadAllResults(it) + require.NoError(t, err) + require.Len(t, results, 1) + + // Verify cached addresses were added + require.Equal(t, pid, *results[0].ID) + require.Len(t, results[0].Addrs, 1) + require.Equal(t, publicAddr.String(), results[0].Addrs[0].String()) + }) + + t.Run("GetClosestPeers with fallback to FindPeers", func(t *testing.T) { + ctx := context.Background() + c := makeCID() + pid := peer.ID("test-peer") + publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001") + + // Create mock router + mr := &mockRouter{} + getClosestIter := newMockResultIter([]iter.Result[*types.PeerRecord]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: nil}}, + }) + mr.On("GetClosestPeers", mock.Anything, c).Return(getClosestIter, nil) + + findPeersIter := newMockResultIter([]iter.Result[*types.PeerRecord]{ + {Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: []types.Multiaddr{publicAddr}}}, + }) + mr.On("FindPeers", mock.Anything, pid, 1).Return(findPeersIter, nil) + + // Create cached address book with empty cache + cab, err := newCachedAddrBook() + require.NoError(t, err) + + // Create cached router + cr := NewCachedRouter(mr, cab) + + it, err := cr.GetClosestPeers(ctx, c) + require.NoError(t, err) + + results, err := iter.ReadAllResults(it) + require.NoError(t, err) + require.Len(t, results, 1) + + // Verify addresses from FindPeers fallback + require.Equal(t, pid, *results[0].ID) + require.Len(t, results[0].Addrs, 1) + require.Equal(t, publicAddr.String(), results[0].Addrs[0].String()) + }) + } func TestCacheFallbackIter(t *testing.T) { @@ -173,7 +246,7 @@ func TestCacheFallbackIter(t *testing.T) { cr := NewCachedRouter(mr, cab) // Create fallback iterator - fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown) // Read all results results, err := iter.ReadAllResults(fallbackIter) @@ -204,7 +277,7 @@ func TestCacheFallbackIter(t *testing.T) { cr := NewCachedRouter(mr, cab) // Create fallback iterator - fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown) // Read all results results, err := iter.ReadAllResults(fallbackIter) @@ -240,7 +313,7 @@ func TestCacheFallbackIter(t *testing.T) { cr := NewCachedRouter(mr, cab) // Create fallback iterator - fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown) // Read all results results, err := iter.ReadAllResults(fallbackIter) @@ -266,7 +339,7 @@ func TestCacheFallbackIter(t *testing.T) { cr := NewCachedRouter(mr, cab) // Create fallback iterator - fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown) // Cancel context before sending any values cancel() @@ -293,7 +366,7 @@ func TestCacheFallbackIter(t *testing.T) { cr := NewCachedRouter(mr, cab) // Create fallback iterator - fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown) // First Next() should succeed require.True(t, fallbackIter.Next()) @@ -336,7 +409,7 @@ func TestCacheFallbackIter(t *testing.T) { cr := NewCachedRouter(mr, cab) // Create fallback iterator - fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown) // Cancel context during lookup cancel() @@ -364,7 +437,7 @@ func TestCacheFallbackIter(t *testing.T) { cr := NewCachedRouter(mr, cab) // Create fallback iterator - fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown) // Should still get a result, but with no addresses results, err := iter.ReadAllResults(fallbackIter) @@ -400,7 +473,7 @@ func TestCacheFallbackIter(t *testing.T) { cr := NewCachedRouter(mr, cab) // Create fallback iterator - fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx) + fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown) // Should get all records with addresses results, err := iter.ReadAllResults(fallbackIter) diff --git a/server_dht_test.go b/server_dht_test.go new file mode 100644 index 0000000..3d20452 --- /dev/null +++ b/server_dht_test.go @@ -0,0 +1,382 @@ +package main + +import ( + "context" + "crypto/rand" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/ipfs/boxo/ipns" + "github.com/ipfs/boxo/routing/http/server" + "github.com/ipfs/boxo/routing/http/types" + "github.com/ipfs/boxo/routing/http/types/iter" + "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/libp2p/go-libp2p/core/routing" + "github.com/multiformats/go-multiaddr" + "github.com/stretchr/testify/require" +) + +const ( + mediaTypeJSON = "application/json" + mediaTypeNDJSON = "application/x-ndjson" + + cacheControlShortTTL = "public, max-age=15, stale-while-revalidate=172800, stale-if-error=172800" + cacheControlLongTTL = "public, max-age=300, stale-while-revalidate=172800, stale-if-error=172800" +) + +func makeEd25519PeerID(t *testing.T) (crypto.PrivKey, peer.ID) { + sk, _, err := crypto.GenerateEd25519Key(rand.Reader) + require.NoError(t, err) + + pid, err := peer.IDFromPrivateKey(sk) + require.NoError(t, err) + + return sk, pid +} + +func requireCloseToNow(t *testing.T, lastModified string) { + lastModifiedTime, err := time.Parse(http.TimeFormat, lastModified) + require.NoError(t, err) + require.WithinDuration(t, time.Now(), lastModifiedTime, 1*time.Minute) +} + +func makePeerRecords(t *testing.T, count int) ([]iter.Result[*types.PeerRecord], []peer.ID) { + var peerRecords []iter.Result[*types.PeerRecord] + var peerIDs []peer.ID + + for i := 0; i < count; i++ { + _, p := makeEd25519PeerID(t) + peerIDs = append(peerIDs, p) + + addr := fmt.Sprintf("/ip4/127.0.0.%d/tcp/4001", i+1) + ma, err := multiaddr.NewMultiaddr(addr) + require.NoError(t, err) + + peerRecords = append(peerRecords, iter.Result[*types.PeerRecord]{ + Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &p, + Addrs: []types.Multiaddr{{Multiaddr: ma}}, + }, + }) + } + + return peerRecords, peerIDs +} + +func TestGetClosestPeersEndpoint(t *testing.T) { + t.Parallel() + + makeRequest := func(t *testing.T, router router, contentType, key string) *http.Response { + handler := server.Handler(&composableRouter{dht: router}) + srv := httptest.NewServer(handler) + t.Cleanup(srv.Close) + + urlStr := fmt.Sprintf("http://%s/routing/v1/dht/closest/peers/%s", srv.Listener.Addr().String(), key) + + req, err := http.NewRequest(http.MethodGet, urlStr, nil) + require.NoError(t, err) + if contentType != "" { + req.Header.Set("Accept", contentType) + } + resp, err := http.DefaultClient.Do(req) + require.NoError(t, err) + return resp + } + + t.Run("GET /routing/v1/dht/closest/peers/{cid} returns 200 with 20 peers (JSON)", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + + peerRecords, peerIDs := makePeerRecords(t, 20) + results := iter.FromSlice(peerRecords) + + mockRouter := &mockDHTRouter{ + getClosestPeersFunc: func(ctx context.Context, k cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + if k.Equals(key) { + return results, nil + } + return nil, routing.ErrNotFound + }, + } + + resp := makeRequest(t, mockRouter, mediaTypeJSON, key.String()) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) + require.Equal(t, "Accept", resp.Header.Get("Vary")) + require.Equal(t, cacheControlLongTTL, resp.Header.Get("Cache-Control")) + + requireCloseToNow(t, resp.Header.Get("Last-Modified")) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + bodyStr := string(body) + require.Contains(t, bodyStr, `"Peers":[`) + // Verify all 20 peers and their addresses are present + for i, p := range peerIDs { + require.Contains(t, bodyStr, p.String()) + expectedAddr := fmt.Sprintf("/ip4/127.0.0.%d/tcp/4001", i+1) + require.Contains(t, bodyStr, expectedAddr) + } + }) + + t.Run("GET /routing/v1/dht/closest/peers/{cid} returns 200 with 20 peers (NDJSON)", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + + peerRecords, peerIDs := makePeerRecords(t, 20) + results := iter.FromSlice(peerRecords) + + mockRouter := &mockDHTRouter{ + getClosestPeersFunc: func(ctx context.Context, k cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + if k.Equals(key) { + return results, nil + } + return nil, routing.ErrNotFound + }, + } + + resp := makeRequest(t, mockRouter, mediaTypeNDJSON, key.String()) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, mediaTypeNDJSON, resp.Header.Get("Content-Type")) + require.Equal(t, "Accept", resp.Header.Get("Vary")) + require.Equal(t, cacheControlLongTTL, resp.Header.Get("Cache-Control")) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + + bodyStr := string(body) + // Verify all 20 peers and their addresses are present + for i, p := range peerIDs { + require.Contains(t, bodyStr, p.String()) + expectedAddr := fmt.Sprintf("/ip4/127.0.0.%d/tcp/4001", i+1) + require.Contains(t, bodyStr, expectedAddr) + } + }) + + t.Run("GET /routing/v1/dht/closest/peers/{cid} returns 200 with empty results (JSON)", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{}) + + mockRouter := &mockDHTRouter{ + getClosestPeersFunc: func(ctx context.Context, k cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + if k.Equals(key) { + return results, nil + } + return nil, routing.ErrNotFound + }, + } + + resp := makeRequest(t, mockRouter, mediaTypeJSON, key.String()) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) + require.Equal(t, cacheControlShortTTL, resp.Header.Get("Cache-Control")) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, `{"Peers":null}`, string(body)) + }) + + t.Run("GET /routing/v1/dht/closest/peers/{cid} returns 200 with empty results (NDJSON)", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{}) + + mockRouter := &mockDHTRouter{ + getClosestPeersFunc: func(ctx context.Context, k cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + if k.Equals(key) { + return results, nil + } + return nil, routing.ErrNotFound + }, + } + + resp := makeRequest(t, mockRouter, mediaTypeNDJSON, key.String()) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, mediaTypeNDJSON, resp.Header.Get("Content-Type")) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, "", string(body)) + }) + + t.Run("GET /routing/v1/dht/closest/peers/{cid} returns 200 when router returns ErrNotFound", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + + mockRouter := &mockDHTRouter{ + getClosestPeersFunc: func(ctx context.Context, k cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + return nil, routing.ErrNotFound + }, + } + + resp := makeRequest(t, mockRouter, mediaTypeJSON, key.String()) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Equal(t, `{"Peers":null}`, string(body)) + }) + + t.Run("GET /routing/v1/dht/closest/peers/{invalid-key} returns 400", func(t *testing.T) { + t.Parallel() + + mockRouter := &mockDHTRouter{} + + resp := makeRequest(t, mockRouter, mediaTypeJSON, "not-a-valid-cid") + require.Equal(t, http.StatusBadRequest, resp.StatusCode) + }) + + t.Run("GET /routing/v1/dht/closest/peers/{arbitrary-cid} returns 200", func(t *testing.T) { + t.Parallel() + + // arbitrary CID (not a PeerID) + cidStr := "bafkreidcd7frenco2m6ch7mny63wztgztv3q6fctaffgowkro6kljre5ei" + key, err := cid.Decode(cidStr) + require.NoError(t, err) + + _, pid := makeEd25519PeerID(t) + + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{ + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Addrs: []types.Multiaddr{}, + }}, + }) + + mockRouter := &mockDHTRouter{ + getClosestPeersFunc: func(ctx context.Context, k cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + if k.Equals(key) { + return results, nil + } + return nil, routing.ErrNotFound + }, + } + + resp := makeRequest(t, mockRouter, mediaTypeJSON, cidStr) + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Contains(t, string(body), pid.String()) + }) + + t.Run("GET /routing/v1/dht/closest/peers/{peerid-as-cid} returns 200", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{ + {Val: &types.PeerRecord{ + Schema: types.SchemaPeer, + ID: &pid, + Addrs: []types.Multiaddr{}, + }}, + }) + + mockRouter := &mockDHTRouter{ + getClosestPeersFunc: func(ctx context.Context, k cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + if k.Equals(key) { + return results, nil + } + return nil, routing.ErrNotFound + }, + } + + resp := makeRequest(t, mockRouter, mediaTypeJSON, key.String()) + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + require.Contains(t, string(body), pid.String()) + }) + + t.Run("GET /routing/v1/dht/closest/peers with default Accept header returns JSON", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{}) + + mockRouter := &mockDHTRouter{ + getClosestPeersFunc: func(ctx context.Context, k cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + return results, nil + }, + } + + resp := makeRequest(t, mockRouter, "", key.String()) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) + }) + + t.Run("GET /routing/v1/dht/closest/peers with wildcard Accept header returns JSON", func(t *testing.T) { + t.Parallel() + + _, pid := makeEd25519PeerID(t) + key := peer.ToCid(pid) + + results := iter.FromSlice([]iter.Result[*types.PeerRecord]{}) + + mockRouter := &mockDHTRouter{ + getClosestPeersFunc: func(ctx context.Context, k cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + return results, nil + }, + } + + resp := makeRequest(t, mockRouter, "text/html,*/*", key.String()) + require.Equal(t, http.StatusOK, resp.StatusCode) + require.Equal(t, mediaTypeJSON, resp.Header.Get("Content-Type")) + }) +} + +// mockDHTRouter implements the router interface for testing +type mockDHTRouter struct { + getClosestPeersFunc func(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) +} + +func (m *mockDHTRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { + return nil, routing.ErrNotSupported +} + +func (m *mockDHTRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (iter.ResultIter[*types.PeerRecord], error) { + return nil, routing.ErrNotSupported +} + +func (m *mockDHTRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + if m.getClosestPeersFunc != nil { + return m.getClosestPeersFunc(ctx, key) + } + return nil, routing.ErrNotSupported +} + +func (m *mockDHTRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { + return nil, routing.ErrNotSupported +} + +func (m *mockDHTRouter) PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error { + return routing.ErrNotSupported +} diff --git a/server_routers.go b/server_routers.go index 607e971..c919af5 100644 --- a/server_routers.go +++ b/server_routers.go @@ -3,6 +3,7 @@ package main import ( "context" "errors" + "fmt" "reflect" "sync" "time" @@ -12,6 +13,10 @@ import ( "github.com/ipfs/boxo/routing/http/types" "github.com/ipfs/boxo/routing/http/types/iter" "github.com/ipfs/go-cid" + dht "github.com/libp2p/go-libp2p-kad-dht" + "github.com/libp2p/go-libp2p-kad-dht/dual" + "github.com/libp2p/go-libp2p-kad-dht/fullrt" + "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/routing" manet "github.com/multiformats/go-multiaddr/net" @@ -21,6 +26,7 @@ type router interface { providersRouter peersRouter ipnsRouter + dhtRouter } type providersRouter interface { @@ -36,12 +42,17 @@ type ipnsRouter interface { PutIPNS(ctx context.Context, name ipns.Name, record *ipns.Record) error } +type dhtRouter interface { + GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) +} + var _ server.ContentRouter = composableRouter{} type composableRouter struct { providers providersRouter peers peersRouter ipns ipnsRouter + dht dhtRouter } func (r composableRouter) FindProviders(ctx context.Context, key cid.Cid, limit int) (iter.ResultIter[types.Record], error) { @@ -58,6 +69,13 @@ func (r composableRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) return r.peers.FindPeers(ctx, pid, limit) } +func (r composableRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + if r.dht == nil { + return iter.ToResultIter(iter.FromSlice([]*types.PeerRecord{})), nil + } + return r.dht.GetClosestPeers(ctx, key) +} + func (r composableRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { if r.ipns == nil { return nil, routing.ErrNotFound @@ -128,6 +146,12 @@ func find[T any](ctx context.Context, routers []router, call func(router) (iter. return newManyIter(ctx, its), nil } +func (r parallelRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + return find(ctx, r.routers, func(ri router) (iter.ResultIter[*types.PeerRecord], error) { + return ri.GetClosestPeers(ctx, key) + }) +} + type manyIter[T any] struct { ctx context.Context cancel context.CancelFunc @@ -316,6 +340,7 @@ func (r parallelRouter) ProvideBitswap(ctx context.Context, req *server.BitswapW var _ router = libp2pRouter{} type libp2pRouter struct { + host host.Host routing routing.Routing } @@ -349,6 +374,77 @@ func (d libp2pRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (it return iter.ToResultIter(iter.FromSlice([]*types.PeerRecord{rec})), nil } +func (d libp2pRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + // Per the spec, if the peer ID is empty, we should use self. + if key == cid.Undef { + return nil, errors.New("GetClosestPeers: key is undefined") + } + + keyStr := string(key.Hash()) + var peers []peer.ID + var err error + + switch dhtClient := d.routing.(type) { + case *dual.DHT: + // Only use WAN DHT for public HTTP Routing API (same as Kubo) + // LAN DHT contains private network peers that should not be exposed publicly. + if dhtClient.WAN == nil { + return nil, fmt.Errorf("GetClosestPeers not supported: WAN DHT is not available") + } + peers, err = dhtClient.WAN.GetClosestPeers(ctx, keyStr) + if err != nil { + return nil, err + } + case *fullrt.FullRT: + peers, err = dhtClient.GetClosestPeers(ctx, keyStr) + if err != nil { + return nil, err + } + case *dht.IpfsDHT: + peers, err = dhtClient.GetClosestPeers(ctx, keyStr) + if err != nil { + return nil, err + } + case *bundledDHT: + // bundledDHT uses either fullRT (when ready) or standard DHT + // We need to call GetClosestPeers on the active DHT + activeDHT := dhtClient.getDHT() + switch dht := activeDHT.(type) { + case *fullrt.FullRT: + peers, err = dht.GetClosestPeers(ctx, keyStr) + case *dht.IpfsDHT: + peers, err = dht.GetClosestPeers(ctx, keyStr) + default: + return nil, errors.New("bundledDHT returned unexpected DHT type") + } + if err != nil { + return nil, err + } + default: + return nil, errors.New("cannot call GetClosestPeers on DHT implementation") + } + + // We have some DHT-closest peers. Find addresses for them. + // The addresses should be in the peerstore. + var records []*types.PeerRecord + for _, p := range peers { + addrs := d.host.Peerstore().Addrs(p) + rAddrs := make([]types.Multiaddr, len(addrs)) + for i, addr := range addrs { + rAddrs[i] = types.Multiaddr{Multiaddr: addr} + } + record := types.PeerRecord{ + ID: &p, + Schema: types.SchemaPeer, + Addrs: rAddrs, + // we dont seem to care about protocol/extra infos + } + records = append(records, &record) + } + + return iter.ToResultIter(iter.FromSlice(records)), nil +} + func (d libp2pRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -472,6 +568,22 @@ func (r sanitizeRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) ( }), nil } +func (r sanitizeRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + it, err := r.router.GetClosestPeers(ctx, key) + if err != nil { + return nil, err + } + + return iter.Map(it, func(v iter.Result[*types.PeerRecord]) iter.Result[*types.PeerRecord] { + if v.Err != nil || v.Val == nil { + return v + } + + v.Val.Addrs = filterPrivateMultiaddr(v.Val.Addrs) + return v + }), nil +} + //lint:ignore SA1019 // ignore staticcheck func (r sanitizeRouter) ProvideBitswap(ctx context.Context, req *server.BitswapWriteProvideRequest) (time.Duration, error) { return 0, routing.ErrNotSupported diff --git a/server_routers_test.go b/server_routers_test.go index 4a92800..8b8ebb8 100644 --- a/server_routers_test.go +++ b/server_routers_test.go @@ -41,6 +41,14 @@ func (m *mockRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (ite return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1) } +func (m *mockRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) { + args := m.Called(ctx, key) + if arg0 := args.Get(0); arg0 == nil { + return nil, args.Error(1) + } + return args.Get(0).(iter.ResultIter[*types.PeerRecord]), args.Error(1) +} + func (m *mockRouter) GetIPNS(ctx context.Context, name ipns.Name) (*ipns.Record, error) { args := m.Called(ctx, name) if arg0 := args.Get(0); arg0 == nil {