Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ The following emojis are used to highlight certain changes:
- AutoConf support: automatic configuration of bootstrap peers and delegated routing endpoints ([#123](https://github.com/ipfs/someguy/pull/123)). When enabled (default), the `auto` placeholder is replaced with network-recommended values.
- All endpoint flags (`--provider-endpoints`, `--peer-endpoints`, `--ipns-endpoints`) default to `auto`
- See [environment-variables.md](docs/environment-variables.md#someguy_autoconf) for configuration details
- Added `/routing/v1/peers/closest/{key}` endpoint implementing [IPIP-476](https://github.com/ipfs/specs/pull/476)
- Returns DHT-closest peers to a given CID or PeerID
- Accepts both CID and legacy PeerID formats (e.g., `12D3KooW...`)
- Uses WAN DHT only for more reliable results
- Includes cached addresses in results when available

### Changed

Expand Down
41 changes: 41 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,47 @@ func findPeers(ctx context.Context, pid peer.ID, endpoint string, prettyOutput b
return nil
}

func getClosestPeers(ctx context.Context, key cid.Cid, endpoint string, prettyOutput bool) error {
drc, err := newDelegatedRoutingClient(endpoint)
if err != nil {
return err
}

recordsIter, err := drc.GetClosestPeers(ctx, key)
if err != nil {
return err
}
defer recordsIter.Close()

for recordsIter.Next() {
res := recordsIter.Val()

// Check for error, but do not complain if we exceeded the timeout. We are
// expecting that to happen: we explicitly defined a timeout.
if res.Err != nil {
if !errors.Is(res.Err, context.DeadlineExceeded) {
return res.Err
}

return nil
}

if prettyOutput {
fmt.Fprintln(os.Stdout, res.Val.ID)
fmt.Fprintln(os.Stdout, "\tProtocols:", res.Val.Protocols)
fmt.Fprintln(os.Stdout, "\tAddresses:", res.Val.Addrs)
fmt.Fprintln(os.Stdout)
} else {
err := json.NewEncoder(os.Stdout).Encode(res.Val)
if err != nil {
return err
}
}
}

return nil
}

func getIPNS(ctx context.Context, name ipns.Name, endpoint string, prettyOutput bool) error {
drc, err := newDelegatedRoutingClient(endpoint)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/dustin/go-humanize v1.0.1
github.com/felixge/httpsnoop v1.0.4
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/ipfs/boxo v0.35.2
github.com/ipfs/boxo v0.35.3-0.20251118170232-e71f50ea2263
github.com/ipfs/go-cid v0.5.0
github.com/ipfs/go-log/v2 v2.9.0
github.com/libp2p/go-libp2p v0.45.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ github.com/huin/goupnp v1.3.0 h1:UvLUlWDNpoUdYzb2TCn+MuTWtcjXKSza2n6CBdQ0xXc=
github.com/huin/goupnp v1.3.0/go.mod h1:gnGPsThkYa7bFi/KWmEysQRf48l2dvR5bxr2OFckNX8=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/ipfs/boxo v0.35.2 h1:0QZJJh6qrak28abENOi5OA8NjBnZM4p52SxeuIDqNf8=
github.com/ipfs/boxo v0.35.2/go.mod h1:bZn02OFWwJtY8dDW9XLHaki59EC5o+TGDECXEbe1w8U=
github.com/ipfs/boxo v0.35.3-0.20251118170232-e71f50ea2263 h1:7sSi4euS5Rb+RwQZOXrd/fURpC9kgbESD4DPykaLy0I=
github.com/ipfs/boxo v0.35.3-0.20251118170232-e71f50ea2263/go.mod h1:bZn02OFWwJtY8dDW9XLHaki59EC5o+TGDECXEbe1w8U=
github.com/ipfs/go-block-format v0.2.3 h1:mpCuDaNXJ4wrBJLrtEaGFGXkferrw5eqVvzaHhtFKQk=
github.com/ipfs/go-block-format v0.2.3/go.mod h1:WJaQmPAKhD3LspLixqlqNFxiZ3BZ3xgqxxoSR/76pnA=
github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
Expand Down
41 changes: 41 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,22 @@ func main() {
return findPeers(ctx.Context, pid, endPoint, ctx.Bool("pretty"))
},
},
{
Name: "getclosestpeers",
Usage: "getclosestpeers <key>",
UsageText: "Find DHT-closest peers to a key (CID or peer ID)",
Action: func(ctx *cli.Context) error {
if ctx.NArg() != 1 {
return errors.New("invalid command, see help")
}
keyStr := ctx.Args().Get(0)
c, err := parseKey(keyStr)
if err != nil {
return err
}
return getClosestPeers(ctx.Context, c, ctx.String("endpoint"), ctx.Bool("pretty"))
},
},
{
Name: "getipns",
Usage: "getipns <ipns-id>",
Expand Down Expand Up @@ -413,3 +429,28 @@ func printIfListConfigured(message string, list []string) {
fmt.Printf(message+"%v\n", strings.Join(list, ", "))
}
}

// parseKey parses a string that can be either a CID or a PeerID.
// It accepts the following formats:
// - Arbitrary CIDs (e.g., bafkreidcd7frenco2m6ch7mny63wztgztv3q6fctaffgowkro6kljre5ei)
// - CIDv1 with libp2p-key codec (e.g., bafzaajaiaejca...)
// - Base58-encoded PeerIDs (e.g., 12D3KooW... or QmYyQ...)
//
// Returns the key as a CID. PeerIDs are converted to CIDv1 with libp2p-key codec.
func parseKey(keyStr string) (cid.Cid, error) {
// Try parsing as PeerID first using peer.Decode
// This handles legacy PeerID formats per:
// https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation
pid, pidErr := peer.Decode(keyStr)
if pidErr == nil {
return peer.ToCid(pid), nil
}

// Fall back to parsing as CID (handles arbitrary CIDs and CIDv1 libp2p-key format)
c, cidErr := cid.Parse(keyStr)
if cidErr == nil {
return c, nil
}

return cid.Cid{}, fmt.Errorf("unable to parse as CID or PeerID: %w", errors.Join(cidErr, pidErr))
}
55 changes: 47 additions & 8 deletions server_cached_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ const (
addrCacheStateHit = "hit"
addrCacheStateMiss = "miss"

// source=providers|peers indicates if query originated from provider or peer endpoint
addrQueryOriginLabel = "origin"
addrQueryOriginProviders = "providers"
addrQueryOriginPeers = "peers"
addrQueryOriginUnknown = "unknown"
// source=providers|peers|closest indicates if query originated from provider, peer, or closest peers endpoint
addrQueryOriginLabel = "origin"
addrQueryOriginProviders = "providers"
addrQueryOriginPeers = "peers"
addrQueryOriginClosestPeers = "closest"
addrQueryOriginUnknown = "unknown"

DispatchedFindPeersTimeout = time.Minute
)
Expand All @@ -64,7 +65,7 @@ func (r cachedRouter) FindProviders(ctx context.Context, key cid.Cid, limit int)
return nil, err
}

iter := NewCacheFallbackIter(it, r, ctx)
iter := NewCacheFallbackIter(it, r, ctx, addrQueryOriginProviders)
return iter, nil
}

Expand All @@ -88,6 +89,42 @@ func (r cachedRouter) FindPeers(ctx context.Context, pid peer.ID, limit int) (it
return it, nil
}

func (r cachedRouter) GetClosestPeers(ctx context.Context, key cid.Cid) (iter.ResultIter[*types.PeerRecord], error) {
it, err := r.router.GetClosestPeers(ctx, key)
if err != nil {
return nil, err
}

return r.applyPeerRecordCaching(it, ctx, addrQueryOriginClosestPeers), nil
}

// applyPeerRecordCaching applies cache fallback logic to a PeerRecord iterator
// by converting to Record iterator, applying caching, and converting back
func (r cachedRouter) applyPeerRecordCaching(it iter.ResultIter[*types.PeerRecord], ctx context.Context, queryOrigin string) iter.ResultIter[*types.PeerRecord] {
// Convert *types.PeerRecord to types.Record
recordIter := iter.Map(it, func(v iter.Result[*types.PeerRecord]) iter.Result[types.Record] {
if v.Err != nil {
return iter.Result[types.Record]{Err: v.Err}
}
return iter.Result[types.Record]{Val: v.Val}
})

// Apply caching
cachedIter := NewCacheFallbackIter(recordIter, r, ctx, queryOrigin)

// Convert back to *types.PeerRecord
return iter.Map(cachedIter, func(v iter.Result[types.Record]) iter.Result[*types.PeerRecord] {
if v.Err != nil {
return iter.Result[*types.PeerRecord]{Err: v.Err}
}
peerRec, ok := v.Val.(*types.PeerRecord)
if !ok {
return iter.Result[*types.PeerRecord]{Err: errors.New("unexpected record type")}
}
return iter.Result[*types.PeerRecord]{Val: peerRec}
})
}

// withAddrsFromCache returns the best list of addrs for specified [peer.ID].
// It will consult cache ONLY if the addrs slice passed to it is empty.
func (r cachedRouter) withAddrsFromCache(queryOrigin string, pid peer.ID, addrs []types.Multiaddr) []types.Multiaddr {
Expand Down Expand Up @@ -116,20 +153,22 @@ type cacheFallbackIter struct {
current iter.Result[types.Record]
findPeersResult chan types.PeerRecord
router cachedRouter
queryOrigin string
ctx context.Context
cancel context.CancelFunc
ongoingLookups atomic.Int32
}

// NewCacheFallbackIter is a wrapper around a results iterator that will resolve peers with no addresses from cache and if no cached addresses, will look them up via FindPeers.
// It's a bit complex because it ensures we continue iterating without blocking on the FindPeers call.
func NewCacheFallbackIter(sourceIter iter.ResultIter[types.Record], router cachedRouter, ctx context.Context) *cacheFallbackIter {
func NewCacheFallbackIter(sourceIter iter.ResultIter[types.Record], router cachedRouter, ctx context.Context, queryOrigin string) *cacheFallbackIter {
// Create a cancellable context for this iterator
iterCtx, cancel := context.WithCancel(ctx)

iter := &cacheFallbackIter{
sourceIter: sourceIter,
router: router,
queryOrigin: queryOrigin,
ctx: iterCtx,
cancel: cancel,
findPeersResult: make(chan types.PeerRecord, 100), // Buffer to avoid drops in typical cases
Expand All @@ -148,7 +187,7 @@ func (it *cacheFallbackIter) Next() bool {
switch val.Val.GetSchema() {
case types.SchemaPeer:
if record, ok := val.Val.(*types.PeerRecord); ok {
record.Addrs = it.router.withAddrsFromCache(addrQueryOriginProviders, *record.ID, record.Addrs)
record.Addrs = it.router.withAddrsFromCache(it.queryOrigin, *record.ID, record.Addrs)
if len(record.Addrs) > 0 {
it.current = iter.Result[types.Record]{Val: record}
return true
Expand Down
89 changes: 81 additions & 8 deletions server_cached_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,79 @@ func TestCachedRouter(t *testing.T) {
require.Equal(t, publicAddr.String(), results[0].Addrs[0].String())
})

t.Run("GetClosestPeers with cached addresses", func(t *testing.T) {
ctx := context.Background()
c := makeCID()
pid := peer.ID("test-peer")

// Create mock router
mr := &mockRouter{}
mockIter := newMockResultIter([]iter.Result[*types.PeerRecord]{
{Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: nil}},
})
mr.On("GetClosestPeers", mock.Anything, c).Return(mockIter, nil)

// Create cached address book with test addresses
cab, err := newCachedAddrBook()
require.NoError(t, err)

publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001")
cab.addrBook.AddAddrs(pid, []multiaddr.Multiaddr{publicAddr.Multiaddr}, time.Hour)

// Create cached router
cr := NewCachedRouter(mr, cab)

it, err := cr.GetClosestPeers(ctx, c)
require.NoError(t, err)

results, err := iter.ReadAllResults(it)
require.NoError(t, err)
require.Len(t, results, 1)

// Verify cached addresses were added
require.Equal(t, pid, *results[0].ID)
require.Len(t, results[0].Addrs, 1)
require.Equal(t, publicAddr.String(), results[0].Addrs[0].String())
})

t.Run("GetClosestPeers with fallback to FindPeers", func(t *testing.T) {
ctx := context.Background()
c := makeCID()
pid := peer.ID("test-peer")
publicAddr := mustMultiaddr(t, "/ip4/137.21.14.12/tcp/4001")

// Create mock router
mr := &mockRouter{}
getClosestIter := newMockResultIter([]iter.Result[*types.PeerRecord]{
{Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: nil}},
})
mr.On("GetClosestPeers", mock.Anything, c).Return(getClosestIter, nil)

findPeersIter := newMockResultIter([]iter.Result[*types.PeerRecord]{
{Val: &types.PeerRecord{Schema: "peer", ID: &pid, Addrs: []types.Multiaddr{publicAddr}}},
})
mr.On("FindPeers", mock.Anything, pid, 1).Return(findPeersIter, nil)

// Create cached address book with empty cache
cab, err := newCachedAddrBook()
require.NoError(t, err)

// Create cached router
cr := NewCachedRouter(mr, cab)

it, err := cr.GetClosestPeers(ctx, c)
require.NoError(t, err)

results, err := iter.ReadAllResults(it)
require.NoError(t, err)
require.Len(t, results, 1)

// Verify addresses from FindPeers fallback
require.Equal(t, pid, *results[0].ID)
require.Len(t, results[0].Addrs, 1)
require.Equal(t, publicAddr.String(), results[0].Addrs[0].String())
})

}

func TestCacheFallbackIter(t *testing.T) {
Expand All @@ -173,7 +246,7 @@ func TestCacheFallbackIter(t *testing.T) {
cr := NewCachedRouter(mr, cab)

// Create fallback iterator
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx)
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown)

// Read all results
results, err := iter.ReadAllResults(fallbackIter)
Expand Down Expand Up @@ -204,7 +277,7 @@ func TestCacheFallbackIter(t *testing.T) {
cr := NewCachedRouter(mr, cab)

// Create fallback iterator
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx)
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown)

// Read all results
results, err := iter.ReadAllResults(fallbackIter)
Expand Down Expand Up @@ -240,7 +313,7 @@ func TestCacheFallbackIter(t *testing.T) {
cr := NewCachedRouter(mr, cab)

// Create fallback iterator
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx)
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown)

// Read all results
results, err := iter.ReadAllResults(fallbackIter)
Expand All @@ -266,7 +339,7 @@ func TestCacheFallbackIter(t *testing.T) {
cr := NewCachedRouter(mr, cab)

// Create fallback iterator
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx)
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown)

// Cancel context before sending any values
cancel()
Expand All @@ -293,7 +366,7 @@ func TestCacheFallbackIter(t *testing.T) {
cr := NewCachedRouter(mr, cab)

// Create fallback iterator
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx)
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown)

// First Next() should succeed
require.True(t, fallbackIter.Next())
Expand Down Expand Up @@ -336,7 +409,7 @@ func TestCacheFallbackIter(t *testing.T) {
cr := NewCachedRouter(mr, cab)

// Create fallback iterator
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx)
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown)

// Cancel context during lookup
cancel()
Expand Down Expand Up @@ -364,7 +437,7 @@ func TestCacheFallbackIter(t *testing.T) {
cr := NewCachedRouter(mr, cab)

// Create fallback iterator
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx)
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown)

// Should still get a result, but with no addresses
results, err := iter.ReadAllResults(fallbackIter)
Expand Down Expand Up @@ -400,7 +473,7 @@ func TestCacheFallbackIter(t *testing.T) {
cr := NewCachedRouter(mr, cab)

// Create fallback iterator
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx)
fallbackIter := NewCacheFallbackIter(sourceIter, cr, ctx, addrQueryOriginUnknown)

// Should get all records with addresses
results, err := iter.ReadAllResults(fallbackIter)
Expand Down
Loading