Skip to content
Open
122 changes: 122 additions & 0 deletions examples/floodsub/basic_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
#!/usr/bin/env python3
"""
Basic FloodSub Example

This is a simple example that demonstrates FloodSub publishing and subscribing
without relying on test utilities. It shows the core functionality.

Run this example with:
python examples/floodsub/basic_example.py
"""

import asyncio
import logging
import sys

import trio

from libp2p import new_host
from libp2p.crypto.secp256k1 import create_new_key_pair
from libp2p.pubsub.floodsub import FloodSub
from libp2p.pubsub.pubsub import Pubsub
from libp2p.tools.async_service import background_trio_service
from libp2p.tools.constants import FLOODSUB_PROTOCOL_ID

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("floodsub_basic")


async def main() -> None:
"""Main function demonstrating basic FloodSub functionality."""
logger.info("Starting basic FloodSub example...")

# Create two hosts
key_pair1 = create_new_key_pair()
key_pair2 = create_new_key_pair()

host1 = new_host(
key_pair=key_pair1,
listen_addrs=["/ip4/127.0.0.1/tcp/0"],
)

host2 = new_host(
key_pair=key_pair2,
listen_addrs=["/ip4/127.0.0.1/tcp/0"],
)

# Create FloodSub routers
floodsub1 = FloodSub(protocols=[FLOODSUB_PROTOCOL_ID])
floodsub2 = FloodSub(protocols=[FLOODSUB_PROTOCOL_ID])

# Create Pubsub instances
pubsub1 = Pubsub(
host=host1,
router=floodsub1,
strict_signing=False, # Disable for simplicity
)

pubsub2 = Pubsub(
host=host2,
router=floodsub2,
strict_signing=False, # Disable for simplicity
)

# Start both pubsub services
async with background_trio_service(pubsub1):
async with background_trio_service(pubsub2):
await pubsub1.wait_until_ready()
await pubsub2.wait_until_ready()

logger.info(f"Host 1 ID: {host1.get_id()}")
logger.info(f"Host 2 ID: {host2.get_id()}")

# Start listening on both hosts
logger.info("Starting hosts...")
await host1.get_network().listen()
await host2.get_network().listen()
await trio.sleep(0.5) # Wait for hosts to start listening

# Connect the hosts
logger.info("Connecting hosts...")
await host1.connect(host2.get_id(), host2.get_addrs())
await trio.sleep(1) # Wait for connection

# Subscribe to topic on host2
topic = "test-topic"
logger.info(f"Subscribing to topic: {topic}")
subscription = await pubsub2.subscribe(topic)
await trio.sleep(0.5) # Wait for subscription to propagate

# Publish messages from host1
messages = [
"Hello from FloodSub!",
"This is message number 2",
"FloodSub is working great!"
]

for i, message in enumerate(messages):
logger.info(f"Publishing message {i+1}: {message}")
await pubsub1.publish(topic, message.encode())
await trio.sleep(0.5)

# Receive messages on host2
logger.info("Receiving messages...")
for i in range(len(messages)):
message = await subscription.get()
logger.info(f"Received message {i+1}: {message.data.decode()}")
logger.info(f" From peer: {message.from_id.hex()}")
logger.info(f" Topics: {message.topicIDs}")

logger.info("Basic FloodSub example completed successfully!")


if __name__ == "__main__":
try:
trio.run(main)
except KeyboardInterrupt:
logger.info("Example interrupted by user")
sys.exit(0)
except Exception as e:
logger.error(f"Example failed: {e}")
sys.exit(1)
76 changes: 74 additions & 2 deletions libp2p/kad_dht/kad_dht.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,20 @@ async def handle_stream(self, stream: INetStream) -> None:
closest_peers = self.routing_table.find_local_closest_peers(
target_key, 20
)

# Fallback to connected peers if routing table has insufficient peers
MIN_PEERS_THRESHOLD = 5 # Configurable minimum
if len(closest_peers) < MIN_PEERS_THRESHOLD:
logger.debug("Routing table has insufficient peers (%d < %d) for FIND_NODE in KadDHT, using connected peers as fallback",
len(closest_peers), MIN_PEERS_THRESHOLD)
connected_peers = self.host.get_connected_peers()
if connected_peers:
# Sort connected peers by distance to target and use as response
from .utils import sort_peer_ids_by_distance
fallback_peers = sort_peer_ids_by_distance(target_key, connected_peers)[:20]
closest_peers = fallback_peers
logger.debug("Using %d connected peers as fallback for FIND_NODE in KadDHT", len(closest_peers))

logger.debug(f"Found {len(closest_peers)} peers close to target")

# Consume the source signed_peer_record if sent
Expand Down Expand Up @@ -459,6 +473,20 @@ async def handle_stream(self, stream: INetStream) -> None:
closest_peers = self.routing_table.find_local_closest_peers(
key, 20
)

# Fallback to connected peers if routing table has insufficient peers
MIN_PEERS_THRESHOLD = 5 # Configurable minimum
if len(closest_peers) < MIN_PEERS_THRESHOLD:
logger.debug("Routing table has insufficient peers (%d < %d) for provider response, using connected peers as fallback",
len(closest_peers), MIN_PEERS_THRESHOLD)
connected_peers = self.host.get_connected_peers()
if connected_peers:
# Sort connected peers by distance to target and use as response
from .utils import sort_peer_ids_by_distance
fallback_peers = sort_peer_ids_by_distance(key, connected_peers)[:20]
closest_peers = fallback_peers
logger.debug("Using %d connected peers as fallback for provider response", len(closest_peers))

logger.debug(
f"No providers found, including {len(closest_peers)}"
"closest peers"
Expand Down Expand Up @@ -550,6 +578,20 @@ async def handle_stream(self, stream: INetStream) -> None:
closest_peers = self.routing_table.find_local_closest_peers(
key, 20
)

# Fallback to connected peers if routing table has insufficient peers
MIN_PEERS_THRESHOLD = 5 # Configurable minimum
if len(closest_peers) < MIN_PEERS_THRESHOLD:
logger.debug("Routing table has insufficient peers (%d < %d) for GET_VALUE response, using connected peers as fallback",
len(closest_peers), MIN_PEERS_THRESHOLD)
connected_peers = self.host.get_connected_peers()
if connected_peers:
# Sort connected peers by distance to target and use as response
from .utils import sort_peer_ids_by_distance
fallback_peers = sort_peer_ids_by_distance(key, connected_peers)[:20]
closest_peers = fallback_peers
logger.debug("Using %d connected peers as fallback for GET_VALUE response", len(closest_peers))

logger.debug(
"No value found,"
f"including {len(closest_peers)} closest peers"
Expand Down Expand Up @@ -677,9 +719,24 @@ async def put_value(self, key: bytes, value: bytes) -> None:
)

# 2. Get closest peers, excluding self
routing_table_peers = self.routing_table.find_local_closest_peers(key)

# Fallback to connected peers if routing table has insufficient peers
MIN_PEERS_THRESHOLD = 5 # Configurable minimum
if len(routing_table_peers) < MIN_PEERS_THRESHOLD:
logger.debug("Routing table has insufficient peers (%d < %d) for put_value, using connected peers as fallback",
len(routing_table_peers), MIN_PEERS_THRESHOLD)
connected_peers = self.host.get_connected_peers()
if connected_peers:
# Sort connected peers by distance to target and use as fallback
from .utils import sort_peer_ids_by_distance
fallback_peers = sort_peer_ids_by_distance(key, connected_peers)
routing_table_peers = fallback_peers
logger.debug("Using %d connected peers as fallback for put_value", len(routing_table_peers))

closest_peers = [
peer
for peer in self.routing_table.find_local_closest_peers(key)
for peer in routing_table_peers
if peer != self.local_peer_id
]
logger.debug(f"Found {len(closest_peers)} peers to store value at")
Expand Down Expand Up @@ -722,9 +779,24 @@ async def get_value(self, key: bytes) -> bytes | None:
return value

# 2. Get closest peers, excluding self
routing_table_peers = self.routing_table.find_local_closest_peers(key)

# Fallback to connected peers if routing table has insufficient peers
MIN_PEERS_THRESHOLD = 5 # Configurable minimum
if len(routing_table_peers) < MIN_PEERS_THRESHOLD:
logger.debug("Routing table has insufficient peers (%d < %d) for get_value, using connected peers as fallback",
len(routing_table_peers), MIN_PEERS_THRESHOLD)
connected_peers = self.host.get_connected_peers()
if connected_peers:
# Sort connected peers by distance to target and use as fallback
from .utils import sort_peer_ids_by_distance
fallback_peers = sort_peer_ids_by_distance(key, connected_peers)
routing_table_peers = fallback_peers
logger.debug("Using %d connected peers as fallback for get_value", len(routing_table_peers))

closest_peers = [
peer
for peer in self.routing_table.find_local_closest_peers(key)
for peer in routing_table_peers
if peer != self.local_peer_id
]
logger.debug(f"Searching {len(closest_peers)} peers for value")
Expand Down
25 changes: 25 additions & 0 deletions libp2p/kad_dht/peer_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ async def find_closest_peers_network(
# Start with closest peers from our routing table
closest_peers = self.routing_table.find_local_closest_peers(target_key, count)
logger.debug("Local closest peers: %d found", len(closest_peers))

# Fallback to connected peers if routing table has insufficient peers
MIN_PEERS_THRESHOLD = 5 # Configurable minimum
if len(closest_peers) < MIN_PEERS_THRESHOLD:
logger.debug("Routing table has insufficient peers (%d < %d), using connected peers as fallback",
len(closest_peers), MIN_PEERS_THRESHOLD)
connected_peers = self.host.get_connected_peers()
if connected_peers:
# Sort connected peers by distance to target and use as initial query targets
fallback_peers = sort_peer_ids_by_distance(target_key, connected_peers)[:count]
closest_peers = fallback_peers
logger.debug("Using %d connected peers as fallback", len(closest_peers))

queried_peers: set[ID] = set()
rounds = 0

Expand Down Expand Up @@ -387,6 +400,18 @@ async def _handle_kad_stream(self, stream: INetStream) -> None:
closest_peers = self.routing_table.find_local_closest_peers(
target_key, 20
)

# Fallback to connected peers if routing table has insufficient peers
MIN_PEERS_THRESHOLD = 5 # Configurable minimum
if len(closest_peers) < MIN_PEERS_THRESHOLD:
logger.debug("Routing table has insufficient peers (%d < %d) for FIND_NODE response, using connected peers as fallback",
len(closest_peers), MIN_PEERS_THRESHOLD)
connected_peers = self.host.get_connected_peers()
if connected_peers:
# Sort connected peers by distance to target and use as response
fallback_peers = sort_peer_ids_by_distance(target_key, connected_peers)[:20]
closest_peers = fallback_peers
logger.debug("Using %d connected peers as fallback for FIND_NODE response", len(closest_peers))

# Create protobuf response
response = Message()
Expand Down
81 changes: 47 additions & 34 deletions libp2p/pubsub/gossipsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ class GossipSub(IPubsubRouter, Service):
prune_back_off: int
unsubscribe_back_off: int

flood_publish: bool

def __init__(
self,
protocols: Sequence[TProtocol],
Expand All @@ -116,6 +118,7 @@ def __init__(
px_peers_count: int = 16,
prune_back_off: int = 60,
unsubscribe_back_off: int = 10,
flood_publish: bool = False,
) -> None:
self.protocols = list(protocols)
self.pubsub = None
Expand Down Expand Up @@ -156,6 +159,8 @@ def __init__(
self.prune_back_off = prune_back_off
self.unsubscribe_back_off = unsubscribe_back_off

self.flood_publish = flood_publish

async def run(self) -> None:
self.manager.run_daemon_task(self.heartbeat)
if len(self.direct_peers) > 0:
Expand Down Expand Up @@ -300,42 +305,50 @@ def _get_peers_to_send(
if topic not in self.pubsub.peer_topics:
continue

# direct peers
_direct_peers: set[ID] = {_peer for _peer in self.direct_peers}
send_to.update(_direct_peers)

# floodsub peers
floodsub_peers: set[ID] = {
peer_id
for peer_id in self.pubsub.peer_topics[topic]
if self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID
}
send_to.update(floodsub_peers)

# gossipsub peers
gossipsub_peers: set[ID] = set()
if topic in self.mesh:
gossipsub_peers = self.mesh[topic]
if self.flood_publish and msg_forwarder == self.pubsub.my_id:
for peer in self.pubsub.peer_topics[topic]:
# TODO: add score threshold check when peer scoring is implemented
# if direct peer then skip score check
send_to.add(peer)
else:
# When we publish to a topic that we have not subscribe to, we randomly
# pick `self.degree` number of peers who have subscribed to the topic
# and add them as our `fanout` peers.
topic_in_fanout: bool = topic in self.fanout
fanout_peers: set[ID] = self.fanout[topic] if topic_in_fanout else set()
fanout_size = len(fanout_peers)
if not topic_in_fanout or (
topic_in_fanout and fanout_size < self.degree
):
if topic in self.pubsub.peer_topics:
# Combine fanout peers with selected peers
fanout_peers.update(
self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree - fanout_size, fanout_peers
# direct peers
direct_peers: set[ID] = {_peer for _peer in self.direct_peers}
send_to.update(direct_peers)

# floodsub peers
floodsub_peers: set[ID] = {
peer_id
for peer_id in self.pubsub.peer_topics[topic]
if self.peer_protocol[peer_id] == floodsub.PROTOCOL_ID
}
send_to.update(floodsub_peers)

# gossipsub peers
gossipsub_peers: set[ID] = set()
if topic in self.mesh:
gossipsub_peers = self.mesh[topic]
else:
# When we publish to a topic that we have not subscribe to, we
# randomly pick `self.degree` number of peers who have subscribed
# to the topic and add them as our `fanout` peers.
topic_in_fanout: bool = topic in self.fanout
fanout_peers: set[ID] = (
self.fanout[topic] if topic_in_fanout else set()
)
fanout_size = len(fanout_peers)
if not topic_in_fanout or (
topic_in_fanout and fanout_size < self.degree
):
if topic in self.pubsub.peer_topics:
# Combine fanout peers with selected peers
fanout_peers.update(
self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree - fanout_size, fanout_peers
)
)
)
self.fanout[topic] = fanout_peers
gossipsub_peers = fanout_peers
send_to.update(gossipsub_peers)
self.fanout[topic] = fanout_peers
gossipsub_peers = fanout_peers
send_to.update(gossipsub_peers)
# Excludes `msg_forwarder` and `origin`
yield from send_to.difference([msg_forwarder, origin])

Expand Down
1 change: 1 addition & 0 deletions libp2p/tools/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class GossipsubParams(NamedTuple):
px_peers_count: int = 16
prune_back_off: int = 60
unsubscribe_back_off: int = 10
flood_publish: bool = False


GOSSIPSUB_PARAMS = GossipsubParams()
1 change: 1 addition & 0 deletions newsfragments/713.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added flood publishing.
Loading