-
Notifications
You must be signed in to change notification settings - Fork 189
enh/871-add-peerscore-scorefunc-signdpeer-gossipsub-1.1 #872
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
enh/871-add-peerscore-scorefunc-signdpeer-gossipsub-1.1 #872
Conversation
…' of https://github.com/Winter-Soren/py-libp2p into enh/871-add-peerscore-scorefunc-signdpeer-gossipsub-1.1
Issue SummaryThe peer exchange test ( Root Cause AnalysisBefore Gossipsub v1.1 Changes:
After Gossipsub v1.1 Changes:
Error Logs ObservedSolution Implemented
Key ChangesIn
In
|
|
@Winter-Soren : Wonderful progress, Soham. Great work. Adding @lla-dane and @sumanjeet0012 to keep them in sync with the latest gossipsub developments in this PR. |
|
@Winter-Soren : Great work, Soham. Thank you for your continued efforts. Reviewing this PR in detail. Re-ran the CI/CD pipeline. Wish if you could resolve the CI/CD issues. |
…ncreasing decay iterations
|
@Winter-Soren : Great, nice work Soham. This PR is indeed ready for final review. Reviewing it tonight. Also, CCing for feedback and final thoughts from @sumanjeet0012, @lla-dane, @acul71 and @sukhman-sukh. |
…h peer message delivery
…ability and partitioning
…' of https://github.com/Winter-Soren/py-libp2p into enh/871-add-peerscore-scorefunc-signdpeer-gossipsub-1.1
…es when a single topic is provided
Hi @seetadev Thank you so much for your feedback and for sharing the test scenarios in #981. I've now added comprehensive test coverage for all the key GossipSub v1.1 features mentioned in the discussion: Test Coverage ImplementationI've created the following test files to cover all the required scenarios:
All the test scenarios mentioned in #981 are now covered, ensuring that py-libp2p's GossipSub v1.1 implementation aligns with the specification and is resilient under various conditions. |
…res when a single topic is provided
… explicit score reset and verification
|
@Winter-Soren : Great, thank you so much for making improvements and incorporating the feedback points on test suite. @pacrob and @acul71: Hi Paul and Luca. Wish to have your thoughts and feedback on the PR. This is indeed an important PR and enables us to have a fully functioning Gossipsub 1.1 version. |
More then reviewing I'm aiming to learn from this PR :-) Review here: PR #872 Review: Gossipsub v1.1 Peer Scoring and Signed Peer RecordsOverviewThis PR implements Gossipsub v1.1 features including peer scoring, score-based gates, and signed peer records in peer exchange. The implementation adds significant resilience against spam and Sybil attacks while maintaining backward compatibility. What's Been Implemented1. Peer Scoring SystemImplements the Gossipsub v1.1 scoring mechanism that tracks peer behavior and assigns scores based on message delivery, time in mesh, and invalid messages. Scores are used to make decisions about message acceptance, gossip propagation, and mesh management. Core Implementation ( class PeerScorer:
def __init__(self, params: ScoreParams) -> None:
self.params = params
self.time_in_mesh: DefaultDict[ID, DefaultDict[str, float]] = defaultdict(
lambda: defaultdict(float)
)
self.first_message_deliveries: DefaultDict[ID, DefaultDict[str, float]] = (
defaultdict(lambda: defaultdict(float))
)
self.mesh_message_deliveries: DefaultDict[ID, DefaultDict[str, float]] = (
defaultdict(lambda: defaultdict(float))
)
self.invalid_messages: DefaultDict[ID, DefaultDict[str, float]] = defaultdict(
lambda: defaultdict(float)
)
# Global state
self.behavior_penalty: dict[ID, float] = defaultdict(float)Score Parameters: @dataclass
class ScoreParams:
# Topic-scoped P1..P4
p1_time_in_mesh: TopicScoreParams = field(
default_factory=lambda: TopicScoreParams()
)
p2_first_message_deliveries: TopicScoreParams = field(
default_factory=lambda: TopicScoreParams()
)
p3_mesh_message_deliveries: TopicScoreParams = field(
default_factory=lambda: TopicScoreParams()
)
p4_invalid_messages: TopicScoreParams = field(
default_factory=lambda: TopicScoreParams()
)
# Global P5..P7
p5_behavior_penalty_weight: float = 0.0
p5_behavior_penalty_decay: float = 1.0
p5_behavior_penalty_threshold: float = 0.0
# Acceptance thresholds
publish_threshold: float = -math.inf
gossip_threshold: float = -math.inf
graylist_threshold: float = -math.inf
accept_px_threshold: float = -math.inf2. Score Gates IntegrationApplies peer scores as gates to control message flow - peers with low scores are blocked from publishing, gossiping, or participating in peer exchange. This prevents spam and improves network resilience. Publish Gate ( try:
scorer = getattr(self.router, "scorer", None)
if scorer is not None:
if not scorer.allow_publish(msg_forwarder, list(msg.topicIDs)):
logger.debug(
"Rejecting message from %s by publish score gate", msg_forwarder
)
return
except Exception:
# Router may not support scoring; ignore gracefully
passGossip Gate ( def allow_gossip(self, peer: ID, topics: list[str]) -> bool:
"""Check if a peer is allowed to gossip about the given topics."""
if not topics:
return False
if len(topics) == 1:
topic = topics[0]
topic_score = self.topic_score(peer, topic)
# Apply behavior penalty if applicable
if self.behavior_penalty[peer] > self.params.p5_behavior_penalty_threshold:
topic_score -= (
self.behavior_penalty[peer]
- self.params.p5_behavior_penalty_threshold
) * self.params.p5_behavior_penalty_weight
return topic_score >= self.params.gossip_threshold
return self.score(peer, topics) >= self.params.gossip_threshold3. Signed Peer Records in Peer ExchangeEnhances peer exchange (PX) with cryptographically signed peer records that provide authenticated peer information. This enables secure and faster address resolution when peers are pruned from the mesh. PX Implementation ( async def _do_px(self, px_peers: list[rpc_pb2.PeerInfo]) -> None:
for peer in px_peers:
peer_id: ID = ID(peer.peerID)
if peer.HasField("signedPeerRecord") and len(peer.signedPeerRecord) > 0:
# Validate envelope signature and freshness via peerstore consume
envelope, record = consume_envelope(
peer.signedPeerRecord, "libp2p-peer-record"
)
# Ensure the record matches the advertised peer id
if record.peer_id != peer_id:
raise ValueError("peer id mismatch in PX signed record")
# Store into peerstore and update addrs
self.pubsub.host.get_peerstore().consume_peer_record(
envelope, ttl=7200
)
peer_info = PeerInfo(record.peer_id, record.addrs)
await self.pubsub.host.connect(peer_info)Signed Record Validation ( def maybe_consume_signed_record(msg: RPC, host: IHost, peer_id: ID) -> bool:
if msg.HasField("senderRecord"):
try:
envelope, record = consume_envelope(msg.senderRecord, "libp2p-peer-record")
if not record.peer_id == peer_id:
return False
# Use the default TTL of 2 hours (7200 seconds)
if not host.get_peerstore().consume_peer_record(envelope, 7200):
logger.error("Failed to update the Certified-Addr-Book")
return False
except Exception as e:
logger.error("Failed to update the Certified-Addr-Book: %s", e)
return False
return True4. Opportunistic GraftingAutomatically improves mesh quality by grafting high-scoring peers that exceed the median score of current mesh peers. This helps maintain a healthy mesh with reliable peers even when the network topology changes. Mesh Heartbeat with Opportunistic Grafting ( # Opportunistic grafting based on median scores
if self.scorer is not None and num_mesh_peers_in_topic >= self.degree_low:
try:
scorer = self.scorer
scores = [scorer.score(p, [topic]) for p in self.mesh[topic]]
if scores:
median_score = statistics.median(scores)
# Find higher-than-median peers outside mesh
candidates = self._get_in_topic_gossipsub_peers_from_minus(
topic, self.degree, self.mesh[topic], True
)
for cand in candidates:
if scorer.score(cand, [topic]) > median_score:
self.mesh[topic].add(cand)
peers_to_graft[cand].append(topic)
break
except Exception:
pass5. Heartbeat-Driven Score DecayImplements score decay over time to ensure that past behavior doesn't permanently affect peer scores. This allows peers to recover from temporary issues and prevents indefinite penalties for past mistakes. Score Decay Integration ( async def heartbeat(self) -> None:
while True:
# Maintain mesh and keep track of which peers to send GRAFT or PRUNE to
peers_to_graft, peers_to_prune = self.mesh_heartbeat()
# Maintain fanout
self.fanout_heartbeat()
# Get the peers to send IHAVE to
peers_to_gossip = self.gossip_heartbeat()
# Pack(piggyback) GRAFT, PRUNE and IHAVE for the same peer into
# one control message and send it
await self._emit_control_msgs(
peers_to_graft, peers_to_prune, peers_to_gossip
)
self.mcache.shift()
# scorer decay step
if self.scorer is not None:
self.scorer.on_heartbeat()
await trio.sleep(self.heartbeat_interval)How Peer Scoring WorksScore Calculation ExampleTopic-Scoped Scoring (P1-P4):
Total Topic Score: Global Scoring (P5-P7):
Score Gates in ActionPublish Gate Example: Gossip Gate Example: Graylisting Example: Opportunistic Grafting ExampleAreas for Improvement1. Error HandlingCurrent Issue: Bare exception handling in opportunistic grafting try:
# opportunistic grafting logic
except Exception:
passImprovement: Add specific exception handling and logging try:
# opportunistic grafting logic
except (ValueError, KeyError) as e:
logger.warning("Opportunistic grafting failed: %s", e)
except Exception as e:
logger.error("Unexpected error in opportunistic grafting: %s", e)2. P6/P7 ParametersCurrent Issue: Placeholder implementation # TODO: P6/P7 placeholders: app-specific and IP-colocation
# terms (not implemented).Improvement: Add documentation about future implementation # TODO: P6 (Application-specific penalty) and P7 (IP colocation penalty)
# These require application-specific logic and will be implemented
# when concrete use cases are identified.3. Score Parameter DocumentationCurrent Issue: Default thresholds use publish_threshold: float = -math.inf
gossip_threshold: float = -math.infImprovement: Add documentation and reasonable defaults # Default thresholds (permissive for initial implementation)
# Consider tuning based on network conditions
publish_threshold: float = -math.inf # Allow all publishes initially
gossip_threshold: float = -math.inf # Allow all gossip initially4. Protocol Version DetectionMissing: Explicit v1.0 vs v1.1 protocol negotiation def supports_scoring(self, peer_id: ID) -> bool:
"""Check if peer supports Gossipsub v1.1 scoring features."""
return self.peer_protocol.get(peer_id) == PROTOCOL_ID_V115. Score Metrics and ObservabilityMissing: Metrics for score behavior def get_score_stats(self, peer_id: ID, topic: str) -> dict:
"""Get detailed score statistics for a peer."""
return {
"time_in_mesh": self.time_in_mesh[peer_id][topic],
"first_deliveries": self.first_message_deliveries[peer_id][topic],
"mesh_deliveries": self.mesh_message_deliveries[peer_id][topic],
"invalid_messages": self.invalid_messages[peer_id][topic],
"behavior_penalty": self.behavior_penalty[peer_id],
"total_score": self.score(peer_id, [topic])
}ConclusionThe implementation successfully adds Gossipsub v1.1 features with proper peer scoring, signed peer records, and comprehensive test coverage. The code is well-structured and follows the specification closely. Minor improvements in error handling and documentation would enhance the implementation further. |
…ection, docs, and observability
@acul71 Thank you for the comprehensive and detailed review! I really appreciate the thorough analysis and the specific improvement suggestions. I've implemented all 5 areas you identified: ✅ Implemented Improvements1. Enhanced Error Handling in Opportunistic Grafting
2. Improved P6/P7 Documentation
3. Added Score Parameter Documentation
4. Implemented Protocol Version Detection
5. Added Score Metrics and Observability
6. Comprehensive Test Coverage
Code Quality
The implementation now provides much better observability, error handling, and documentation while maintaining the high quality of the original Gossipsub v1.1 features. Thank you for the excellent feedback it really helped improve the implementation! |
|
@Winter-Soren : Great efforts. Appreciate the contribution. Wish to share that I re-run the CI/CD pipeline. There are some issues. Wish if you could fix them. Reviewing the PR in details. Will share feedback soon. |
|
@pacrob @seetadev @Winter-Soren Hello. I've seen that all request have been fulfilled. Thanks!! 📋 PR #872 Review Analysis: acul71's Requests vs Implementation StatusBased on my analysis of the PR messages and code, here's what acul71 requested and what has been implemented: ✅ ALL 5 AREAS REQUESTED BY ACUL71 HAVE BEEN IMPLEMENTED🎯 1. Enhanced Error Handling in Opportunistic Grafting ✅ IMPLEMENTEDacul71's Request:
Implementation Status: ✅ COMPLETE # libp2p/pubsub/gossipsub.py:666-675
except (ValueError, KeyError) as e:
logger.warning(
"Opportunistic grafting failed for topic %s: %s", topic, e
)
except Exception as e:
logger.error(
"Unexpected error in opportunistic grafting for topic %s: %s",
topic,
e,
)🎯 2. Improved P6/P7 Documentation ✅ IMPLEMENTEDacul71's Request:
Implementation Status: ✅ COMPLETE # libp2p/pubsub/score.py:160-164
# TODO: P6 (Application-specific penalty) and P7 (IP colocation penalty)
# These require application-specific logic and will be implemented
# when concrete use cases are identified.
# P6: App-specific scoring based on custom application metrics
# P7: IP colocation penalty to prevent Sybil attacks from same IP ranges🎯 3. Added Score Parameter Documentation ✅ IMPLEMENTEDacul71's Request:
Implementation Status: ✅ COMPLETE # libp2p/pubsub/score.py:44-50
# Acceptance thresholds (permissive defaults for initial implementation)
# These defaults allow all messages initially. In production environments,
# consider tuning based on network conditions and attack models:
# - publish_threshold: Minimum score to accept published messages (e.g., 0.0)
# - gossip_threshold: Minimum score to gossip about peer (e.g., -1.0)
# - graylist_threshold: Score below which peer is ignored (e.g., -10.0)
# - accept_px_threshold: Minimum score to accept PX from peer (e.g., 0.0)🎯 4. Protocol Version Detection ✅ IMPLEMENTEDacul71's Request:
Implementation Status: ✅ COMPLETE # libp2p/pubsub/gossipsub.py:176-183
def supports_scoring(self, peer_id: ID) -> bool:
"""
Check if peer supports Gossipsub v1.1 scoring features.
:param peer_id: The peer to check
:return: True if peer supports v1.1 features, False otherwise
"""
return self.peer_protocol.get(peer_id) == PROTOCOL_ID_V11🎯 5. Score Metrics and Observability ✅ IMPLEMENTEDacul71's Request:
Implementation Status: ✅ COMPLETE # libp2p/pubsub/score.py:261-278
def get_score_stats(self, peer: ID, topic: str) -> dict[str, float]:
"""
Get detailed score statistics for a peer in a specific topic.
Useful for debugging, monitoring, and understanding peer behavior.
:param peer: The peer ID to get stats for
:param topic: The topic to get stats for
:return: Dictionary containing all score components and total score
"""
return {
"time_in_mesh": self.time_in_mesh[peer][topic],
"first_deliveries": self.first_message_deliveries[peer][topic],
"mesh_deliveries": self.mesh_message_deliveries[peer][topic],
"invalid_messages": self.invalid_messages[peer][topic],
"behavior_penalty": self.behavior_penalty[peer],
"total_score": self.score(peer, [topic]),
}🎯 6. PACROB'S FEEDBACK ✅ ADDRESSEDpacrob's Request:
Implementation Status: ✅ ADDRESSED
🎯 7. SEETADEV'S REQUESTS ✅ IMPLEMENTEDseetadev's Requests:
📊 SUMMARY: 100% IMPLEMENTATION RATE
🎉 CONCLUSIONAll feedback from acul71, pacrob, and seetadev has been successfully implemented! The PR #872 now includes:
The implementation is production-ready and addresses all maintainer concerns! 🚀 |
|
@Winter-Soren : Fantastic work on the GossipSub 1.1 protocol update! 🎉The PR now reflects better observability, and production readiness. @acul71 : Thank you for sharing that Soham has successfully implemented get_score_stats(), addressed all feedback points as shared by us, ensured full test coverage, and resolved CI/CD issues. This is a neat contribution to the project, and I'll ask @sumanjeet0012 to try out its implementation in universal connectivity dapp. I'll also follow up with @acul71 on gossipsub interop efforts: https://github.com/libp2p/test-plans/tree/master/gossipsub-interop. We will open a new github issue for the same. |
What was wrong?
Gossipsub v1.1 features were incomplete in py-libp2p:
Refs:
Issue #871
How was it fixed?
signedPeerRecordenvelopes, ensurepeerIDmatch, upsert in peerstore, connectPeerScorerwith P1–P4 + P5 (weights, caps, decay, thresholds)Summary of approach
libp2p/pubsub/score.pywith minimal decayed scoring model and thresholdslibp2p/pubsub/gossipsub.py:libp2p/pubsub/pubsub.py:To-Do
Cute Animal Picture