Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/interface/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
"it-stream-types": "^2.0.1",
"multiformats": "^12.0.1",
"p-defer": "^4.0.0",
"progress-events": "^1.0.0",
"race-signal": "^1.0.0",
"uint8arraylist": "^2.4.3"
},
Expand Down
16 changes: 11 additions & 5 deletions packages/interface/src/content-routing/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { AbortOptions } from '../index.js'
import type { PeerInfo } from '../peer-info/index.js'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

/**
* Any object that implements this Symbol as a property should return a
Expand All @@ -23,7 +24,12 @@ import type { CID } from 'multiformats/cid'
*/
export const contentRouting = Symbol.for('@libp2p/content-routing')

export interface ContentRouting {
export interface ContentRouting<
ProvideProgressEvents extends ProgressEvent = ProgressEvent,
FindProvidersProgressEvents extends ProgressEvent = ProgressEvent,
PutProgressEvents extends ProgressEvent = ProgressEvent,
GetProgressEvents extends ProgressEvent = ProgressEvent
> {
/**
* The implementation of this method should ensure that network peers know the
* caller can provide content that corresponds to the passed CID.
Expand All @@ -35,7 +41,7 @@ export interface ContentRouting {
* await contentRouting.provide(cid)
* ```
*/
provide(cid: CID, options?: AbortOptions): Promise<void>
provide(cid: CID, options?: AbortOptions & ProgressOptions<ProvideProgressEvents>): Promise<void>

/**
* Find the providers of the passed CID.
Expand All @@ -49,7 +55,7 @@ export interface ContentRouting {
* }
* ```
*/
findProviders(cid: CID, options?: AbortOptions): AsyncIterable<PeerInfo>
findProviders(cid: CID, options?: AbortOptions & ProgressOptions<FindProvidersProgressEvents>): AsyncIterable<PeerInfo>

/**
* Puts a value corresponding to the passed key in a way that can later be
Expand All @@ -65,7 +71,7 @@ export interface ContentRouting {
* await contentRouting.put(key, value)
* ```
*/
put(key: Uint8Array, value: Uint8Array, options?: AbortOptions): Promise<void>
put(key: Uint8Array, value: Uint8Array, options?: AbortOptions & ProgressOptions<PutProgressEvents>): Promise<void>

/**
* Retrieves a value from the network corresponding to the passed key.
Expand All @@ -79,5 +85,5 @@ export interface ContentRouting {
* const value = await contentRouting.get(key)
* ```
*/
get(key: Uint8Array, options?: AbortOptions): Promise<Uint8Array>
get(key: Uint8Array, options?: AbortOptions & ProgressOptions<GetProgressEvents>): Promise<Uint8Array>
}
55 changes: 48 additions & 7 deletions packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import type { StreamHandler, StreamHandlerOptions } from './stream-handler/index
import type { Topology } from './topology/index.js'
import type { Listener } from './transport/index.js'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { ProgressEvent } from 'progress-events'

/**
* Used by the connection manager to sort addresses into order before dialling
Expand Down Expand Up @@ -113,7 +114,15 @@ export interface IdentifyResult {
* Event names are `noun:verb` so the first part is the name of the object
* being acted on and the second is the action.
*/
export interface Libp2pEvents<T extends ServiceMap = ServiceMap> {
export interface Libp2pEvents<
Services extends ServiceMap = ServiceMap,
FindPeerProgressEvents extends ProgressEvent = ProgressEvent,
GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent,
ProvideProgressEvents extends ProgressEvent = ProgressEvent,
FindProvidersProgressEvents extends ProgressEvent = ProgressEvent,
PutProgressEvents extends ProgressEvent = ProgressEvent,
GetProgressEvents extends ProgressEvent = ProgressEvent
> {
/**
* This event is dispatched when a new network peer is discovered.
*
Expand Down Expand Up @@ -240,7 +249,15 @@ export interface Libp2pEvents<T extends ServiceMap = ServiceMap> {
* })
* ```
*/
'start': CustomEvent<Libp2p<T>>
'start': CustomEvent<Libp2p<
Copy link
Member

@maschad maschad Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty verbose, should we have this as a seperate interface?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a massive fan of how this is implemented so I'm certainly open to suggestions of how to make it simpler.

The idea is to derive the types of content/peer routing progress events you'll get from the config so they can be type safe. I not sure it's reliably possible, and it leads to these sorts of generics contortions - we may be better off just making it untyped.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree there, I don't see any type safety benefits as is, I think we are better off with

export interface Libp2pEvents<
  Services extends ServiceMap = ServiceMap,
  ProgressEvents extends ProgressEvent = ProgressEvent
> {
  'start': CustomEvent<Libp2p<Services, ProgressEvents>>;
  'stop': CustomEvent<Libp2p<Services, ProgressEvents>>;
}

given the DHTContentRouting class already specifies the ProgessEvent types in it's methods.

Services,
FindPeerProgressEvents,
GetClosestPeersProgressEvents,
ProvideProgressEvents,
FindProvidersProgressEvents,
PutProgressEvents,
GetProgressEvents
>>

/**
* This event notifies listeners that the node has stopped
Expand All @@ -251,7 +268,15 @@ export interface Libp2pEvents<T extends ServiceMap = ServiceMap> {
* })
* ```
*/
'stop': CustomEvent<Libp2p<T>>
'stop': CustomEvent<Libp2p<
Services,
FindPeerProgressEvents,
GetClosestPeersProgressEvents,
ProvideProgressEvents,
FindProvidersProgressEvents,
PutProgressEvents,
GetProgressEvents
>>
}

/**
Expand Down Expand Up @@ -308,7 +333,23 @@ export interface PendingDial {
/**
* Libp2p nodes implement this interface.
*/
export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, TypedEventTarget<Libp2pEvents<T>> {
export interface Libp2p<
Services extends ServiceMap = ServiceMap,
FindPeerProgressEvents extends ProgressEvent = ProgressEvent,
GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent,
ProvideProgressEvents extends ProgressEvent = ProgressEvent,
FindProvidersProgressEvents extends ProgressEvent = ProgressEvent,
PutProgressEvents extends ProgressEvent = ProgressEvent,
GetProgressEvents extends ProgressEvent = ProgressEvent
> extends Startable, TypedEventTarget<Libp2pEvents<
Services,
FindPeerProgressEvents,
GetClosestPeersProgressEvents,
ProvideProgressEvents,
FindProvidersProgressEvents,
PutProgressEvents,
GetProgressEvents
>> {
/**
* The PeerId is a unique identifier for a node on the network.
*
Expand Down Expand Up @@ -359,7 +400,7 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
* }
* ```
*/
peerRouting: PeerRouting
peerRouting: PeerRouting<FindPeerProgressEvents, GetClosestPeersProgressEvents>

/**
* The content routing subsystem allows the user to find providers for content,
Expand All @@ -375,7 +416,7 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
* }
* ```
*/
contentRouting: ContentRouting
contentRouting: ContentRouting<ProvideProgressEvents, FindProvidersProgressEvents, GetProgressEvents, PutProgressEvents>

/**
* The keychain contains the keys used by the current node, and can create new
Expand Down Expand Up @@ -602,7 +643,7 @@ export interface Libp2p<T extends ServiceMap = ServiceMap> extends Startable, Ty
/**
* A set of user defined services
*/
services: T
services: Services
}

/**
Expand Down
10 changes: 7 additions & 3 deletions packages/interface/src/peer-routing/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { AbortOptions } from '../index.js'
import type { PeerId } from '../peer-id/index.js'
import type { PeerInfo } from '../peer-info/index.js'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

/**
* Any object that implements this Symbol as a property should return a
Expand All @@ -23,7 +24,10 @@ import type { PeerInfo } from '../peer-info/index.js'
*/
export const peerRouting = Symbol.for('@libp2p/peer-routing')

export interface PeerRouting {
export interface PeerRouting<
FindPeerProgressEvents extends ProgressEvent = ProgressEvent,
GetClosestPeersProgressEvents extends ProgressEvent = ProgressEvent
> {
/**
* Searches the network for peer info corresponding to the passed peer id.
*
Expand All @@ -34,7 +38,7 @@ export interface PeerRouting {
* const peer = await peerRouting.findPeer(peerId, options)
* ```
*/
findPeer(peerId: PeerId, options?: AbortOptions): Promise<PeerInfo>
findPeer(peerId: PeerId, options?: AbortOptions & ProgressOptions<FindPeerProgressEvents>): Promise<PeerInfo>

/**
* Search the network for peers that are closer to the passed key. Peer
Expand All @@ -49,5 +53,5 @@ export interface PeerRouting {
* }
* ```
*/
getClosestPeers(key: Uint8Array, options?: AbortOptions): AsyncIterable<PeerInfo>
getClosestPeers(key: Uint8Array, options?: AbortOptions & ProgressOptions<GetClosestPeersProgressEvents>): AsyncIterable<PeerInfo>
}
61 changes: 50 additions & 11 deletions packages/kad-dht/src/dual-kad-dht.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,47 +4,73 @@ import { TypedEventEmitter, CustomEvent } from '@libp2p/interface/events'
import { type PeerDiscovery, peerDiscovery, type PeerDiscoveryEvents } from '@libp2p/interface/peer-discovery'
import { type PeerRouting, peerRouting } from '@libp2p/interface/peer-routing'
import { logger } from '@libp2p/logger'
import drain from 'it-drain'
import merge from 'it-merge'
import isPrivate from 'private-ip'
import { CustomProgressEvent } from 'progress-events'
import { DefaultKadDHT } from './kad-dht.js'
import { queryErrorEvent } from './query/events.js'
import type { DualKadDHT, KadDHT, KadDHTComponents, KadDHTInit, QueryEvent, QueryOptions } from './index.js'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { PeerInfo } from '@libp2p/interface/peer-info'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { CID } from 'multiformats/cid'
import type { ProgressEvent, ProgressOptions } from 'progress-events'

export type ProvideProgressEvents =
ProgressEvent<'libp2p:content-routing:provide:dht:event', QueryEvent>

export type FindProvidersProgressEvents =
ProgressEvent<'libp2p:content-routing:find-providers:dht:event', QueryEvent>

export type PutProgressEvents =
ProgressEvent<'libp2p:content-routing:put:dht:event', QueryEvent>

export type GetProgressEvents =
ProgressEvent<'libp2p:content-routing:get:dht:event', QueryEvent>

const log = logger('libp2p:kad-dht')

/**
* Wrapper class to convert events into returned values
*/
class DHTContentRouting implements ContentRouting {
class DHTContentRouting implements ContentRouting<
ProvideProgressEvents,
FindProvidersProgressEvents,
PutProgressEvents,
GetProgressEvents
> {
private readonly dht: KadDHT

constructor (dht: KadDHT) {
this.dht = dht
}

async provide (cid: CID, options: QueryOptions = {}): Promise<void> {
await drain(this.dht.provide(cid, options))
async provide (cid: CID, options: QueryOptions & ProgressOptions<ProvideProgressEvents> = {}): Promise<void> {
for await (const event of this.dht.provide(cid, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:provide:dht:event', event))
}
}

async * findProviders (cid: CID, options: QueryOptions = {}): AsyncGenerator<PeerInfo, void, undefined> {
async * findProviders (cid: CID, options: QueryOptions & ProgressOptions<FindProvidersProgressEvents> = {}): AsyncGenerator<PeerInfo, void, undefined> {
for await (const event of this.dht.findProviders(cid, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:find-providers:dht:event', event))

if (event.name === 'PROVIDER') {
yield * event.providers
}
}
}

async put (key: Uint8Array, value: Uint8Array, options?: QueryOptions): Promise<void> {
await drain(this.dht.put(key, value, options))
async put (key: Uint8Array, value: Uint8Array, options: QueryOptions & ProgressOptions<PutProgressEvents> = {}): Promise<void> {
for await (const event of this.dht.put(key, value, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:put:dht:event', event))
}
}

async get (key: Uint8Array, options?: QueryOptions): Promise<Uint8Array> {
async get (key: Uint8Array, options: QueryOptions & ProgressOptions<GetProgressEvents> = {}): Promise<Uint8Array> {
for await (const event of this.dht.get(key, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:content-routing:get:dht:event', event))

if (event.name === 'VALUE') {
return event.value
}
Expand All @@ -54,18 +80,29 @@ class DHTContentRouting implements ContentRouting {
}
}

export type FindPeerProgressEvents =
ProgressEvent<'libp2p:peer-routing:find-peer:dht:event', QueryEvent>

export type GetClosestPeersProgressEvents =
ProgressEvent<'libp2p:peer-routing:get-closest-peers:dht:event', QueryEvent>

/**
* Wrapper class to convert events into returned values
*/
class DHTPeerRouting implements PeerRouting {
class DHTPeerRouting implements PeerRouting<
FindPeerProgressEvents,
GetClosestPeersProgressEvents
> {
private readonly dht: KadDHT

constructor (dht: KadDHT) {
this.dht = dht
}

async findPeer (peerId: PeerId, options: QueryOptions = {}): Promise<PeerInfo> {
async findPeer (peerId: PeerId, options: QueryOptions & ProgressOptions<FindPeerProgressEvents> = {}): Promise<PeerInfo> {
for await (const event of this.dht.findPeer(peerId, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:peer-routing:find-peer:dht:event', event))

if (event.name === 'FINAL_PEER') {
return event.peer
}
Expand All @@ -74,8 +111,10 @@ class DHTPeerRouting implements PeerRouting {
throw new CodeError('Not found', 'ERR_NOT_FOUND')
}

async * getClosestPeers (key: Uint8Array, options: QueryOptions = {}): AsyncIterable<PeerInfo> {
async * getClosestPeers (key: Uint8Array, options: QueryOptions & ProgressOptions<GetClosestPeersProgressEvents> = {}): AsyncIterable<PeerInfo> {
for await (const event of this.dht.getClosestPeers(key, options)) {
options.onProgress?.(new CustomProgressEvent('libp2p:peer-routing:get-closest-peers:dht:event', event))

if (event.name === 'FINAL_PEER') {
yield event.peer
}
Expand Down
1 change: 1 addition & 0 deletions packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
"p-queue": "^7.3.4",
"p-retry": "^6.0.0",
"private-ip": "^3.0.0",
"progress-events": "^1.0.0",
"protons-runtime": "^5.0.0",
"rate-limiter-flexible": "^3.0.0",
"uint8arraylist": "^2.4.3",
Expand Down
Loading