Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
14 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 125 additions & 21 deletions libp2p/pubsub/gossipsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
)
import logging
import random
import statistics
import time
from typing import (
Any,
Expand All @@ -22,14 +23,11 @@
MessageID,
TProtocol,
)
from libp2p.peer.envelope import consume_envelope
from libp2p.peer.id import (
ID,
)
from libp2p.peer.peerinfo import (
PeerInfo,
peer_info_from_bytes,
peer_info_to_bytes,
)
from libp2p.peer.peerinfo import PeerInfo
from libp2p.peer.peerstore import (
PERMANENT_ADDR_TTL,
env_to_send_in_RPC,
Expand All @@ -54,6 +52,10 @@
from .pubsub import (
Pubsub,
)
from .score import (
PeerScorer,
ScoreParams,
)
from .utils import (
parse_message_id_safe,
safe_parse_message_id,
Expand Down Expand Up @@ -116,6 +118,7 @@ def __init__(
px_peers_count: int = 16,
prune_back_off: int = 60,
unsubscribe_back_off: int = 10,
score_params: ScoreParams | None = None,
) -> None:
self.protocols = list(protocols)
self.pubsub = None
Expand Down Expand Up @@ -156,6 +159,9 @@ def __init__(
self.prune_back_off = prune_back_off
self.unsubscribe_back_off = unsubscribe_back_off

# Scoring
self.scorer: PeerScorer | None = PeerScorer(score_params or ScoreParams())

async def run(self) -> None:
self.manager.run_daemon_task(self.heartbeat)
if len(self.direct_peers) > 0:
Expand Down Expand Up @@ -275,6 +281,11 @@ async def publish(self, msg_forwarder: ID, pubsub_msg: rpc_pb2.Message) -> None:
raise NoPubsubAttached
if peer_id not in self.pubsub.peers:
continue
# Publish gate
if self.scorer is not None and not self.scorer.allow_publish(
peer_id, list(pubsub_msg.topicIDs)
):
continue
stream = self.pubsub.peers[peer_id]

# TODO: Go use `sendRPC`, which possibly piggybacks gossip/control messages.
Expand Down Expand Up @@ -335,7 +346,17 @@ def _get_peers_to_send(
)
self.fanout[topic] = fanout_peers
gossipsub_peers = fanout_peers
send_to.update(gossipsub_peers)
# Apply gossip score gate
if self.scorer is not None and gossipsub_peers:
allowed = {
p
for p in gossipsub_peers
if self.scorer.allow_gossip(p, [topic])
and not self.scorer.is_graylisted(p, [topic])
}
send_to.update(allowed)
else:
send_to.update(gossipsub_peers)
# Excludes `msg_forwarder` and `origin`
yield from send_to.difference([msg_forwarder, origin])

Expand Down Expand Up @@ -498,6 +519,10 @@ async def heartbeat(self) -> None:

self.mcache.shift()

# scorer decay step
if self.scorer is not None:
self.scorer.on_heartbeat()

await trio.sleep(self.heartbeat_interval)

async def direct_connect_heartbeat(self) -> None:
Expand Down Expand Up @@ -546,6 +571,25 @@ def mesh_heartbeat(
# Emit GRAFT(topic) control message to peer
peers_to_graft[peer].append(topic)

# Opportunistic grafting based on median scores
if self.scorer is not None and num_mesh_peers_in_topic >= self.degree_low:
try:
scorer = self.scorer
scores = [scorer.score(p, [topic]) for p in self.mesh[topic]]
if scores:
median_score = statistics.median(scores)
# Find higher-than-median peers outside mesh
candidates = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree, self.mesh[topic], True
)
for cand in candidates:
if scorer.score(cand, [topic]) > median_score:
self.mesh[topic].add(cand)
peers_to_graft[cand].append(topic)
break
except Exception:
pass

if num_mesh_peers_in_topic > self.degree_high:
# Select |mesh[topic]| - D peers from mesh[topic]
selected_peers = self.select_from_minus(
Expand All @@ -554,6 +598,8 @@ def mesh_heartbeat(
for peer in selected_peers:
# Remove peer from mesh[topic]
self.mesh[topic].discard(peer)
if self.scorer is not None:
self.scorer.on_leave_mesh(peer, topic)

# Emit PRUNE(topic) control message to peer
peers_to_prune[peer].append(topic)
Expand Down Expand Up @@ -759,18 +805,54 @@ async def _do_px(self, px_peers: list[rpc_pb2.PeerInfo]) -> None:
continue

try:
peer_info = peer_info_from_bytes(peer.signedPeerRecord)
try:
# Validate signed peer record if provided;
# otherwise try to connect directly
if peer.HasField("signedPeerRecord") and len(peer.signedPeerRecord) > 0:
# Validate envelope signature and freshness via peerstore consume
if self.pubsub is None:
raise NoPubsubAttached
await self.pubsub.host.connect(peer_info)
except Exception as e:
logger.warning(
"failed to connect to px peer %s: %s",
peer_id,
e,

envelope, record = consume_envelope(
peer.signedPeerRecord, "libp2p-peer-record"
)
continue

# Ensure the record matches the advertised peer id
if record.peer_id != peer_id:
raise ValueError("peer id mismatch in PX signed record")

# Store into peerstore and update addrs
self.pubsub.host.get_peerstore().consume_peer_record(
envelope, ttl=7200
)

peer_info = PeerInfo(record.peer_id, record.addrs)
try:
await self.pubsub.host.connect(peer_info)
except Exception as e:
logger.warning(
"failed to connect to px peer %s: %s",
peer_id,
e,
)
continue
else:
# No signed record available; try to use existing connection info
if self.pubsub is None:
raise NoPubsubAttached

try:
# Try to get existing peer info from peerstore
existing_peer_info = self.pubsub.host.get_peerstore().peer_info(
peer_id
)
await self.pubsub.host.connect(existing_peer_info)
except Exception as e:
logger.debug(
"peer %s not found in peerstore or connection failed: %s",
peer_id,
e,
)
continue
except Exception as e:
logger.warning(
"failed to parse peer info from px peer %s: %s",
Expand Down Expand Up @@ -862,6 +944,12 @@ async def handle_graft(
) -> None:
topic: str = graft_msg.topicID

# Score gate for GRAFT acceptance
if self.scorer is not None:
if self.scorer.is_graylisted(sender_peer_id, [topic]):
await self.emit_prune(topic, sender_peer_id, False, False)
return

# Add peer to mesh for topic
if topic in self.mesh:
for direct_peer in self.direct_peers:
Expand All @@ -884,6 +972,8 @@ async def handle_graft(

if sender_peer_id not in self.mesh[topic]:
self.mesh[topic].add(sender_peer_id)
if self.scorer is not None:
self.scorer.on_join_mesh(sender_peer_id, topic)
else:
# Respond with PRUNE if not subscribed to the topic
await self.emit_prune(topic, sender_peer_id, self.do_px, False)
Expand All @@ -905,9 +995,16 @@ async def handle_prune(
self._add_back_off(sender_peer_id, topic, False)

self.mesh[topic].discard(sender_peer_id)
if self.scorer is not None:
self.scorer.on_leave_mesh(sender_peer_id, topic)

if px_peers:
await self._do_px(px_peers)
# Score-gate PX acceptance
allow_px = True
if self.scorer is not None:
allow_px = self.scorer.allow_px_from(sender_peer_id, [topic])
if allow_px:
await self._do_px(px_peers)

# RPC emitters

Expand Down Expand Up @@ -977,11 +1074,18 @@ async def emit_prune(
for peer in exchange_peers:
if self.pubsub is None:
raise NoPubsubAttached
peer_info = self.pubsub.host.get_peerstore().peer_info(peer)
signed_peer_record: rpc_pb2.PeerInfo = rpc_pb2.PeerInfo()
signed_peer_record.peerID = peer.to_bytes()
signed_peer_record.signedPeerRecord = peer_info_to_bytes(peer_info)
prune_msg.peers.append(signed_peer_record)

# Try to get the signed peer record envelope from peerstore
envelope = self.pubsub.host.get_peerstore().get_peer_record(peer)
peer_info_msg: rpc_pb2.PeerInfo = rpc_pb2.PeerInfo()
peer_info_msg.peerID = peer.to_bytes()

if envelope is not None:
# Use the signed envelope
peer_info_msg.signedPeerRecord = envelope.marshal_envelope()
# If no signed record available, include peer without signed record

prune_msg.peers.append(peer_info_msg)

control_msg: rpc_pb2.ControlMessage = rpc_pb2.ControlMessage()
control_msg.prune.extend([prune_msg])
Expand Down
29 changes: 29 additions & 0 deletions libp2p/pubsub/pubsub.py
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,18 @@ async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:
if self._is_msg_seen(msg):
return

try:
scorer = getattr(self.router, "scorer", None)
if scorer is not None:
if not scorer.allow_publish(msg_forwarder, list(msg.topicIDs)):
logger.debug(
"Rejecting message from %s by publish score gate", msg_forwarder
)
return
except Exception:
# Router may not support scoring; ignore gracefully
pass

# Check if signing is required and if so validate the signature
if self.strict_signing:
# Validate the signature of the message
Expand All @@ -778,6 +790,14 @@ async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:
try:
await self.validate_msg(msg_forwarder, msg)
except ValidationError:
# Scoring: count invalid messages
try:
scorer = getattr(self.router, "scorer", None)
if scorer is not None:
for topic in msg.topicIDs:
scorer.on_invalid_message(msg_forwarder, topic)
except Exception:
pass
logger.debug(
"Topic validation failed: sender %s sent data %s under topic IDs: %s %s:%s", # noqa: E501
msg_forwarder,
Expand All @@ -790,6 +810,15 @@ async def push_msg(self, msg_forwarder: ID, msg: rpc_pb2.Message) -> None:

self._mark_msg_seen(msg)

# Scoring: first delivery for this sender per topic
try:
scorer = getattr(self.router, "scorer", None)
if scorer is not None:
for topic in msg.topicIDs:
scorer.on_first_delivery(msg_forwarder, topic)
except Exception:
pass

# reject messages claiming to be from ourselves but not locally published
self_id = self.host.get_id()
if (
Expand Down
Loading
Loading