Skip to content

Conversation

@Winter-Soren
Copy link
Contributor

What was wrong?

Gossipsub v1.1 features were incomplete in py-libp2p:

  • PX lacked signed peer records validation and peerstore updates
  • No score-based gates (publish, gossip, PX, graylist)
  • No opportunistic grafting
  • Incomplete scoring event wiring

Refs:

Issue #871

How was it fixed?

  • PX: validate signedPeerRecord envelopes, ensure peerID match, upsert in peerstore, connect
  • Scoring: new PeerScorer with P1–P4 + P5 (weights, caps, decay, thresholds)
  • Gates: enforce publish acceptance (inbound), gossip emission, PX acceptance, and graylisting
  • Mesh: join/leave hooks; opportunistic grafting using median mesh score
  • Heartbeat-driven decay; integrated without breaking existing APIs

Summary of approach

  • Added libp2p/pubsub/score.py with minimal decayed scoring model and thresholds
  • Extended libp2p/pubsub/gossipsub.py:
    • PRUNE emit/handle: carry PX peers; validate/store records; gate PX by score
    • GRAFT/PRUNE/mesh mgmt: join/leave hooks and opportunistic grafting
    • Gossip/publish: score gates applied
  • Updated libp2p/pubsub/pubsub.py:
    • Inbound publish acceptance gate
    • Scoring updates on validation failure and first delivery

To-Do

  • Clean up commit history
  • Add or update documentation related to these changes
  • Add entry to the release notes

Cute Animal Picture

cute animal

@Winter-Soren Winter-Soren changed the title added PX signed peer records, scoring gates, and opportunistic grafting enh/871-add-peerscore-scorefunc-signdpeer-gossipsub-1.1 Aug 26, 2025
@Winter-Soren
Copy link
Contributor Author

Issue Summary

The peer exchange test (test_peer_exchange) started failing after implementing Gossipsub v1.1 signed peer records. This was due to the test environment not having proper signed peer records, which the new v1.1 implementation requires.

Root Cause Analysis

Before Gossipsub v1.1 Changes:

  • emit_prune used peer_info_to_bytes(peer_info) which created simple text format: "peer_id\naddr1\naddr2"
  • _do_px tried to parse this as signed peer records, causing "Error parsing message" errors
  • The test was likely passing due to silent failures or other mechanisms

After Gossipsub v1.1 Changes:

  • emit_prune was updated to use proper signed peer records via envelope.marshal_envelope()
  • Test environment had no signed peer records in peerstore
  • When host_1 sent PRUNE to host_0 with host_2 as PX candidate, no signed records existed for host_2
  • Peer exchange data was empty, so no connection happened
  • Test failed: assert host_2.get_id() in gsub0.mesh[topic]

Error Logs Observed

failed to parse peer info from px peer QmcLkaJx9AumkgYseowvsL9UE9ATE39irYuig8RTZQZ6fo: Error parsing message
failed to parse peer info from px peer QmSg4vX87SYgPwXdLzDyGfDRaL1RyiEnk7UKvnyqimxJTc: Error parsing message

Solution Implemented

  1. Fixed Signed Peer Record Format: Updated emit_prune to use envelope.marshal_envelope() instead of peer_info_to_bytes()

  2. Added Fallback Mechanism: Modified _do_px to handle cases where signed records aren't available:

    # Try to get existing peer info from peerstore
    existing_peer_info = self.pubsub.host.get_peerstore().peer_info(peer_id)
    await self.pubsub.host.connect(existing_peer_info)
  3. Updated Test Setup: Made the test more realistic by:

    • Adding initial connection between host_0 and host_2 so peer info exists in peerstore
    • Disconnecting them to simulate the PX scenario
    • Ensuring proper mesh state for testing

Key Changes

In libp2p/pubsub/gossipsub.py:

  • Fixed emit_prune to use proper signed envelope format
  • Added fallback in _do_px for peers without signed records
  • Removed unused peer_info_to_bytes import

In tests/core/pubsub/test_gossipsub_px_and_backoff.py:

  • Added await connect(host_0, host_2) for initial peer discovery
  • Added disconnection logic to simulate PX scenario
  • Ensured proper mesh state assertions

@seetadev
Copy link
Contributor

seetadev commented Sep 1, 2025

@Winter-Soren : Wonderful progress, Soham. Great work.

Adding @lla-dane and @sumanjeet0012 to keep them in sync with the latest gossipsub developments in this PR.

@seetadev
Copy link
Contributor

@Winter-Soren : Great work, Soham. Thank you for your continued efforts. Reviewing this PR in detail.

Re-ran the CI/CD pipeline. Wish if you could resolve the CI/CD issues.

@Winter-Soren Winter-Soren marked this pull request as ready for review September 22, 2025 07:12
@seetadev
Copy link
Contributor

@Winter-Soren : Great, nice work Soham. This PR is indeed ready for final review. Reviewing it tonight.

Also, CCing for feedback and final thoughts from @sumanjeet0012, @lla-dane, @acul71 and @sukhman-sukh.

@Winter-Soren
Copy link
Contributor Author

@Winter-Soren : Hi Soham. Appreciate your efforts. We discussed on your PR. Wish if you could add some key scenarios in the test suite and check if all the tests shared at #981 are covered in this PR.

This is indeed a very important PR. We will following it up with gossipsub interop and also develop a boilerplate py-libp2p example to enable developer adoption of this module.

@pacrob and @acul71: Hi Paul and Luca. Wish to have your thoughts and feedback as @Winter-Soren reviews the test case scenarios shared at #981

Hi @seetadev Thank you so much for your feedback and for sharing the test scenarios in #981. I've now added comprehensive test coverage for all the key GossipSub v1.1 features mentioned in the discussion:

Test Coverage Implementation

I've created the following test files to cover all the required scenarios:

  1. Core Functionality

    • test_gossipsub_v1_1_core_functionality.py: Tests message propagation under normal mesh conditions and interoperability with GossipSub v1.0 peers with graceful fallback
  2. Peer Scoring

    • test_gossipsub_v1_1_peer_scoring_behavior.py: Tests score decreases for invalid/duplicate messages, score stability for honest peers, and pruning of low-scoring peers
  3. IHAVE/IWANT Adaptive Gossip

    • test_gossipsub_v1_1_ihave_iwant.py: Tests IWANT requests triggered by dropped gossip and prevention of infinite gossip loops
  4. Opportunistic Grafting

    • test_gossipsub_v1_1_opportunistic_grafting.py: Tests mesh improvement with high-scoring peers and simulation of degraded mesh quality
  5. Flood Publishing

    • test_gossipsub_v1_1_flood_publishing.py: Tests publishing from non-mesh peers to verify reliable message delivery
  6. Invalid Behavior Rejection

    • test_gossipsub_v1_1_invalid_behavior.py: Tests rejection of invalid signatures/malformed payloads and penalization of excessive IHAVE/IWANT spam
  7. Network-Level Scenarios

    • test_gossipsub_v1_1_network_scenarios.py: Tests large-scale fanout, partition & reconnect, and mesh stability under changing peer connections

All the test scenarios mentioned in #981 are now covered, ensuring that py-libp2p's GossipSub v1.1 implementation aligns with the specification and is resilient under various conditions.

@seetadev
Copy link
Contributor

@Winter-Soren : Great, thank you so much for making improvements and incorporating the feedback points on test suite.

@pacrob and @acul71: Hi Paul and Luca. Wish to have your thoughts and feedback on the PR. This is indeed an important PR and enables us to have a fully functioning Gossipsub 1.1 version.

@acul71
Copy link
Contributor

acul71 commented Oct 16, 2025

@pacrob and @acul71: Hi Paul and Luca. Wish to have your thoughts and feedback on the PR. This is indeed an important PR and enables us to have a fully functioning Gossipsub 1.1 version.

More then reviewing I'm aiming to learn from this PR :-)

Review here:

PR #872 Review: Gossipsub v1.1 Peer Scoring and Signed Peer Records

Overview

This PR implements Gossipsub v1.1 features including peer scoring, score-based gates, and signed peer records in peer exchange. The implementation adds significant resilience against spam and Sybil attacks while maintaining backward compatibility.

What's Been Implemented

1. Peer Scoring System

Implements the Gossipsub v1.1 scoring mechanism that tracks peer behavior and assigns scores based on message delivery, time in mesh, and invalid messages. Scores are used to make decisions about message acceptance, gossip propagation, and mesh management.

Core Implementation (libp2p/pubsub/score.py):

class PeerScorer:
    def __init__(self, params: ScoreParams) -> None:
        self.params = params
        self.time_in_mesh: DefaultDict[ID, DefaultDict[str, float]] = defaultdict(
            lambda: defaultdict(float)
        )
        self.first_message_deliveries: DefaultDict[ID, DefaultDict[str, float]] = (
            defaultdict(lambda: defaultdict(float))
        )
        self.mesh_message_deliveries: DefaultDict[ID, DefaultDict[str, float]] = (
            defaultdict(lambda: defaultdict(float))
        )
        self.invalid_messages: DefaultDict[ID, DefaultDict[str, float]] = defaultdict(
            lambda: defaultdict(float)
        )
        # Global state
        self.behavior_penalty: dict[ID, float] = defaultdict(float)

Score Parameters:

@dataclass
class ScoreParams:
    # Topic-scoped P1..P4
    p1_time_in_mesh: TopicScoreParams = field(
        default_factory=lambda: TopicScoreParams()
    )
    p2_first_message_deliveries: TopicScoreParams = field(
        default_factory=lambda: TopicScoreParams()
    )
    p3_mesh_message_deliveries: TopicScoreParams = field(
        default_factory=lambda: TopicScoreParams()
    )
    p4_invalid_messages: TopicScoreParams = field(
        default_factory=lambda: TopicScoreParams()
    )
    
    # Global P5..P7
    p5_behavior_penalty_weight: float = 0.0
    p5_behavior_penalty_decay: float = 1.0
    p5_behavior_penalty_threshold: float = 0.0
    
    # Acceptance thresholds
    publish_threshold: float = -math.inf
    gossip_threshold: float = -math.inf
    graylist_threshold: float = -math.inf
    accept_px_threshold: float = -math.inf

2. Score Gates Integration

Applies peer scores as gates to control message flow - peers with low scores are blocked from publishing, gossiping, or participating in peer exchange. This prevents spam and improves network resilience.

Publish Gate (libp2p/pubsub/pubsub.py):

try:
    scorer = getattr(self.router, "scorer", None)
    if scorer is not None:
        if not scorer.allow_publish(msg_forwarder, list(msg.topicIDs)):
            logger.debug(
                "Rejecting message from %s by publish score gate", msg_forwarder
            )
            return
except Exception:
    # Router may not support scoring; ignore gracefully
    pass

Gossip Gate (libp2p/pubsub/score.py):

def allow_gossip(self, peer: ID, topics: list[str]) -> bool:
    """Check if a peer is allowed to gossip about the given topics."""
    if not topics:
        return False
    
    if len(topics) == 1:
        topic = topics[0]
        topic_score = self.topic_score(peer, topic)
        # Apply behavior penalty if applicable
        if self.behavior_penalty[peer] > self.params.p5_behavior_penalty_threshold:
            topic_score -= (
                self.behavior_penalty[peer]
                - self.params.p5_behavior_penalty_threshold
            ) * self.params.p5_behavior_penalty_weight
        return topic_score >= self.params.gossip_threshold
    
    return self.score(peer, topics) >= self.params.gossip_threshold

3. Signed Peer Records in Peer Exchange

Enhances peer exchange (PX) with cryptographically signed peer records that provide authenticated peer information. This enables secure and faster address resolution when peers are pruned from the mesh.

PX Implementation (libp2p/pubsub/gossipsub.py):

async def _do_px(self, px_peers: list[rpc_pb2.PeerInfo]) -> None:
    for peer in px_peers:
        peer_id: ID = ID(peer.peerID)
        
        if peer.HasField("signedPeerRecord") and len(peer.signedPeerRecord) > 0:
            # Validate envelope signature and freshness via peerstore consume
            envelope, record = consume_envelope(
                peer.signedPeerRecord, "libp2p-peer-record"
            )
            
            # Ensure the record matches the advertised peer id
            if record.peer_id != peer_id:
                raise ValueError("peer id mismatch in PX signed record")
            
            # Store into peerstore and update addrs
            self.pubsub.host.get_peerstore().consume_peer_record(
                envelope, ttl=7200
            )
            
            peer_info = PeerInfo(record.peer_id, record.addrs)
            await self.pubsub.host.connect(peer_info)

Signed Record Validation (libp2p/pubsub/utils.py):

def maybe_consume_signed_record(msg: RPC, host: IHost, peer_id: ID) -> bool:
    if msg.HasField("senderRecord"):
        try:
            envelope, record = consume_envelope(msg.senderRecord, "libp2p-peer-record")
            if not record.peer_id == peer_id:
                return False
            
            # Use the default TTL of 2 hours (7200 seconds)
            if not host.get_peerstore().consume_peer_record(envelope, 7200):
                logger.error("Failed to update the Certified-Addr-Book")
                return False
        except Exception as e:
            logger.error("Failed to update the Certified-Addr-Book: %s", e)
            return False
    return True

4. Opportunistic Grafting

Automatically improves mesh quality by grafting high-scoring peers that exceed the median score of current mesh peers. This helps maintain a healthy mesh with reliable peers even when the network topology changes.

Mesh Heartbeat with Opportunistic Grafting (libp2p/pubsub/gossipsub.py):

# Opportunistic grafting based on median scores
if self.scorer is not None and num_mesh_peers_in_topic >= self.degree_low:
    try:
        scorer = self.scorer
        scores = [scorer.score(p, [topic]) for p in self.mesh[topic]]
        if scores:
            median_score = statistics.median(scores)
            # Find higher-than-median peers outside mesh
            candidates = self._get_in_topic_gossipsub_peers_from_minus(
                topic, self.degree, self.mesh[topic], True
            )
            for cand in candidates:
                if scorer.score(cand, [topic]) > median_score:
                    self.mesh[topic].add(cand)
                    peers_to_graft[cand].append(topic)
                    break
    except Exception:
        pass

5. Heartbeat-Driven Score Decay

Implements score decay over time to ensure that past behavior doesn't permanently affect peer scores. This allows peers to recover from temporary issues and prevents indefinite penalties for past mistakes.

Score Decay Integration (libp2p/pubsub/gossipsub.py):

async def heartbeat(self) -> None:
    while True:
        # Maintain mesh and keep track of which peers to send GRAFT or PRUNE to
        peers_to_graft, peers_to_prune = self.mesh_heartbeat()
        # Maintain fanout
        self.fanout_heartbeat()
        # Get the peers to send IHAVE to
        peers_to_gossip = self.gossip_heartbeat()
        
        # Pack(piggyback) GRAFT, PRUNE and IHAVE for the same peer into
        # one control message and send it
        await self._emit_control_msgs(
            peers_to_graft, peers_to_prune, peers_to_gossip
        )
        
        self.mcache.shift()
        
        # scorer decay step
        if self.scorer is not None:
            self.scorer.on_heartbeat()
        
        await trio.sleep(self.heartbeat_interval)

How Peer Scoring Works

Score Calculation Example

Topic-Scoped Scoring (P1-P4):

  • P1 (Time in Mesh): score += weight * min(time_in_mesh, cap)

    • Example: Peer has been in mesh for 5 heartbeats, weight=0.1, cap=10
    • Score contribution: 0.1 * min(5, 10) = 0.5
  • P2 (First Message Deliveries): score += weight * min(first_deliveries, cap)

    • Example: Peer delivered 3 first messages, weight=1.0, cap=5
    • Score contribution: 1.0 * min(3, 5) = 3.0
  • P3 (Mesh Message Deliveries): score += weight * min(mesh_deliveries, cap)

    • Example: Peer delivered 8 messages in mesh, weight=0.5, cap=20
    • Score contribution: 0.5 * min(8, 20) = 4.0
  • P4 (Invalid Messages): score -= weight * min(invalid_messages, cap)

    • Example: Peer sent 2 invalid messages, weight=1.0, cap=10
    • Score penalty: -1.0 * min(2, 10) = -2.0

Total Topic Score: 0.5 + 3.0 + 4.0 - 2.0 = 5.5

Global Scoring (P5-P7):

  • P5 (Behavior Penalty): Applied when behavior_penalty > threshold
    • Example: Peer has behavior_penalty=5, threshold=3, weight=2.0
    • Score penalty: -(5 - 3) * 2.0 = -4.0

Score Gates in Action

Publish Gate Example:

Single topic message:
Peer A topic score: 2.5, publish_threshold: 1.0 → ALLOWED
Peer B topic score: 0.5, publish_threshold: 1.0 → REJECTED

Multi-topic message:
Peer A combined score: 5.0, publish_threshold: 1.0 → ALLOWED
Peer B combined score: 0.5, publish_threshold: 1.0 → REJECTED

Gossip Gate Example:

Peer A score: 3.0, gossip_threshold: 2.0 → ALLOWED to gossip
Peer B score: 1.5, gossip_threshold: 2.0 → NOT ALLOWED to gossip

Graylisting Example:

Peer A score: -1.0, graylist_threshold: 0.0 → GRAYLISTED
Peer B score: 1.0, graylist_threshold: 0.0 → NOT GRAYLISTED

Opportunistic Grafting Example

Current mesh peers: [A, B, C, D]
Peer scores: A=2.0, B=3.0, C=1.0, D=4.0
Median score: 2.5

Available candidates: [E, F, G]
Candidate scores: E=5.0, F=1.0, G=3.0

Result: The algorithm finds the first candidate with score > 2.5
        and grafts only that one. If E is checked first, only E gets grafted.
        If G is checked first, only G gets grafted. Only ONE peer is selected.

Areas for Improvement

1. Error Handling

Current Issue: Bare exception handling in opportunistic grafting

try:
    # opportunistic grafting logic
except Exception:
    pass

Improvement: Add specific exception handling and logging

try:
    # opportunistic grafting logic
except (ValueError, KeyError) as e:
    logger.warning("Opportunistic grafting failed: %s", e)
except Exception as e:
    logger.error("Unexpected error in opportunistic grafting: %s", e)

2. P6/P7 Parameters

Current Issue: Placeholder implementation

# TODO: P6/P7 placeholders: app-specific and IP-colocation
# terms (not implemented).

Improvement: Add documentation about future implementation

# TODO: P6 (Application-specific penalty) and P7 (IP colocation penalty)
# These require application-specific logic and will be implemented
# when concrete use cases are identified.

3. Score Parameter Documentation

Current Issue: Default thresholds use -math.inf

publish_threshold: float = -math.inf
gossip_threshold: float = -math.inf

Improvement: Add documentation and reasonable defaults

# Default thresholds (permissive for initial implementation)
# Consider tuning based on network conditions
publish_threshold: float = -math.inf  # Allow all publishes initially
gossip_threshold: float = -math.inf    # Allow all gossip initially

4. Protocol Version Detection

Missing: Explicit v1.0 vs v1.1 protocol negotiation
Improvement: Add protocol version detection

def supports_scoring(self, peer_id: ID) -> bool:
    """Check if peer supports Gossipsub v1.1 scoring features."""
    return self.peer_protocol.get(peer_id) == PROTOCOL_ID_V11

5. Score Metrics and Observability

Missing: Metrics for score behavior
Improvement: Add score monitoring

def get_score_stats(self, peer_id: ID, topic: str) -> dict:
    """Get detailed score statistics for a peer."""
    return {
        "time_in_mesh": self.time_in_mesh[peer_id][topic],
        "first_deliveries": self.first_message_deliveries[peer_id][topic],
        "mesh_deliveries": self.mesh_message_deliveries[peer_id][topic],
        "invalid_messages": self.invalid_messages[peer_id][topic],
        "behavior_penalty": self.behavior_penalty[peer_id],
        "total_score": self.score(peer_id, [topic])
    }

Conclusion

The implementation successfully adds Gossipsub v1.1 features with proper peer scoring, signed peer records, and comprehensive test coverage. The code is well-structured and follows the specification closely. Minor improvements in error handling and documentation would enhance the implementation further.

@Winter-Soren
Copy link
Contributor Author

@pacrob and @acul71: Hi Paul and Luca. Wish to have your thoughts and feedback on the PR. This is indeed an important PR and enables us to have a fully functioning Gossipsub 1.1 version.

More then reviewing I'm aiming to learn from this PR :-)

Review here:

PR #872 Review: Gossipsub v1.1 Peer Scoring and Signed Peer Records

Overview

This PR implements Gossipsub v1.1 features including peer scoring, score-based gates, and signed peer records in peer exchange. The implementation adds significant resilience against spam and Sybil attacks while maintaining backward compatibility.

What's Been Implemented

1. Peer Scoring System

Implements the Gossipsub v1.1 scoring mechanism that tracks peer behavior and assigns scores based on message delivery, time in mesh, and invalid messages. Scores are used to make decisions about message acceptance, gossip propagation, and mesh management.

Core Implementation (libp2p/pubsub/score.py):

class PeerScorer:
    def __init__(self, params: ScoreParams) -> None:
        self.params = params
        self.time_in_mesh: DefaultDict[ID, DefaultDict[str, float]] = defaultdict(
            lambda: defaultdict(float)
        )
        self.first_message_deliveries: DefaultDict[ID, DefaultDict[str, float]] = (
            defaultdict(lambda: defaultdict(float))
        )
        self.mesh_message_deliveries: DefaultDict[ID, DefaultDict[str, float]] = (
            defaultdict(lambda: defaultdict(float))
        )
        self.invalid_messages: DefaultDict[ID, DefaultDict[str, float]] = defaultdict(
            lambda: defaultdict(float)
        )
        # Global state
        self.behavior_penalty: dict[ID, float] = defaultdict(float)

Score Parameters:

@dataclass
class ScoreParams:
    # Topic-scoped P1..P4
    p1_time_in_mesh: TopicScoreParams = field(
        default_factory=lambda: TopicScoreParams()
    )
    p2_first_message_deliveries: TopicScoreParams = field(
        default_factory=lambda: TopicScoreParams()
    )
    p3_mesh_message_deliveries: TopicScoreParams = field(
        default_factory=lambda: TopicScoreParams()
    )
    p4_invalid_messages: TopicScoreParams = field(
        default_factory=lambda: TopicScoreParams()
    )
    
    # Global P5..P7
    p5_behavior_penalty_weight: float = 0.0
    p5_behavior_penalty_decay: float = 1.0
    p5_behavior_penalty_threshold: float = 0.0
    
    # Acceptance thresholds
    publish_threshold: float = -math.inf
    gossip_threshold: float = -math.inf
    graylist_threshold: float = -math.inf
    accept_px_threshold: float = -math.inf

2. Score Gates Integration

Applies peer scores as gates to control message flow - peers with low scores are blocked from publishing, gossiping, or participating in peer exchange. This prevents spam and improves network resilience.

Publish Gate (libp2p/pubsub/pubsub.py):

try:
    scorer = getattr(self.router, "scorer", None)
    if scorer is not None:
        if not scorer.allow_publish(msg_forwarder, list(msg.topicIDs)):
            logger.debug(
                "Rejecting message from %s by publish score gate", msg_forwarder
            )
            return
except Exception:
    # Router may not support scoring; ignore gracefully
    pass

Gossip Gate (libp2p/pubsub/score.py):

def allow_gossip(self, peer: ID, topics: list[str]) -> bool:
    """Check if a peer is allowed to gossip about the given topics."""
    if not topics:
        return False
    
    if len(topics) == 1:
        topic = topics[0]
        topic_score = self.topic_score(peer, topic)
        # Apply behavior penalty if applicable
        if self.behavior_penalty[peer] > self.params.p5_behavior_penalty_threshold:
            topic_score -= (
                self.behavior_penalty[peer]
                - self.params.p5_behavior_penalty_threshold
            ) * self.params.p5_behavior_penalty_weight
        return topic_score >= self.params.gossip_threshold
    
    return self.score(peer, topics) >= self.params.gossip_threshold

3. Signed Peer Records in Peer Exchange

Enhances peer exchange (PX) with cryptographically signed peer records that provide authenticated peer information. This enables secure and faster address resolution when peers are pruned from the mesh.

PX Implementation (libp2p/pubsub/gossipsub.py):

async def _do_px(self, px_peers: list[rpc_pb2.PeerInfo]) -> None:
    for peer in px_peers:
        peer_id: ID = ID(peer.peerID)
        
        if peer.HasField("signedPeerRecord") and len(peer.signedPeerRecord) > 0:
            # Validate envelope signature and freshness via peerstore consume
            envelope, record = consume_envelope(
                peer.signedPeerRecord, "libp2p-peer-record"
            )
            
            # Ensure the record matches the advertised peer id
            if record.peer_id != peer_id:
                raise ValueError("peer id mismatch in PX signed record")
            
            # Store into peerstore and update addrs
            self.pubsub.host.get_peerstore().consume_peer_record(
                envelope, ttl=7200
            )
            
            peer_info = PeerInfo(record.peer_id, record.addrs)
            await self.pubsub.host.connect(peer_info)

Signed Record Validation (libp2p/pubsub/utils.py):

def maybe_consume_signed_record(msg: RPC, host: IHost, peer_id: ID) -> bool:
    if msg.HasField("senderRecord"):
        try:
            envelope, record = consume_envelope(msg.senderRecord, "libp2p-peer-record")
            if not record.peer_id == peer_id:
                return False
            
            # Use the default TTL of 2 hours (7200 seconds)
            if not host.get_peerstore().consume_peer_record(envelope, 7200):
                logger.error("Failed to update the Certified-Addr-Book")
                return False
        except Exception as e:
            logger.error("Failed to update the Certified-Addr-Book: %s", e)
            return False
    return True

4. Opportunistic Grafting

Automatically improves mesh quality by grafting high-scoring peers that exceed the median score of current mesh peers. This helps maintain a healthy mesh with reliable peers even when the network topology changes.

Mesh Heartbeat with Opportunistic Grafting (libp2p/pubsub/gossipsub.py):

# Opportunistic grafting based on median scores
if self.scorer is not None and num_mesh_peers_in_topic >= self.degree_low:
    try:
        scorer = self.scorer
        scores = [scorer.score(p, [topic]) for p in self.mesh[topic]]
        if scores:
            median_score = statistics.median(scores)
            # Find higher-than-median peers outside mesh
            candidates = self._get_in_topic_gossipsub_peers_from_minus(
                topic, self.degree, self.mesh[topic], True
            )
            for cand in candidates:
                if scorer.score(cand, [topic]) > median_score:
                    self.mesh[topic].add(cand)
                    peers_to_graft[cand].append(topic)
                    break
    except Exception:
        pass

5. Heartbeat-Driven Score Decay

Implements score decay over time to ensure that past behavior doesn't permanently affect peer scores. This allows peers to recover from temporary issues and prevents indefinite penalties for past mistakes.

Score Decay Integration (libp2p/pubsub/gossipsub.py):

async def heartbeat(self) -> None:
    while True:
        # Maintain mesh and keep track of which peers to send GRAFT or PRUNE to
        peers_to_graft, peers_to_prune = self.mesh_heartbeat()
        # Maintain fanout
        self.fanout_heartbeat()
        # Get the peers to send IHAVE to
        peers_to_gossip = self.gossip_heartbeat()
        
        # Pack(piggyback) GRAFT, PRUNE and IHAVE for the same peer into
        # one control message and send it
        await self._emit_control_msgs(
            peers_to_graft, peers_to_prune, peers_to_gossip
        )
        
        self.mcache.shift()
        
        # scorer decay step
        if self.scorer is not None:
            self.scorer.on_heartbeat()
        
        await trio.sleep(self.heartbeat_interval)

How Peer Scoring Works

Score Calculation Example

Topic-Scoped Scoring (P1-P4):

  • P1 (Time in Mesh): score += weight * min(time_in_mesh, cap)

    • Example: Peer has been in mesh for 5 heartbeats, weight=0.1, cap=10
    • Score contribution: 0.1 * min(5, 10) = 0.5
  • P2 (First Message Deliveries): score += weight * min(first_deliveries, cap)

    • Example: Peer delivered 3 first messages, weight=1.0, cap=5
    • Score contribution: 1.0 * min(3, 5) = 3.0
  • P3 (Mesh Message Deliveries): score += weight * min(mesh_deliveries, cap)

    • Example: Peer delivered 8 messages in mesh, weight=0.5, cap=20
    • Score contribution: 0.5 * min(8, 20) = 4.0
  • P4 (Invalid Messages): score -= weight * min(invalid_messages, cap)

    • Example: Peer sent 2 invalid messages, weight=1.0, cap=10
    • Score penalty: -1.0 * min(2, 10) = -2.0

Total Topic Score: 0.5 + 3.0 + 4.0 - 2.0 = 5.5

Global Scoring (P5-P7):

  • P5 (Behavior Penalty): Applied when behavior_penalty > threshold

    • Example: Peer has behavior_penalty=5, threshold=3, weight=2.0
    • Score penalty: -(5 - 3) * 2.0 = -4.0

Score Gates in Action

Publish Gate Example:

Single topic message:
Peer A topic score: 2.5, publish_threshold: 1.0 → ALLOWED
Peer B topic score: 0.5, publish_threshold: 1.0 → REJECTED

Multi-topic message:
Peer A combined score: 5.0, publish_threshold: 1.0 → ALLOWED
Peer B combined score: 0.5, publish_threshold: 1.0 → REJECTED

Gossip Gate Example:

Peer A score: 3.0, gossip_threshold: 2.0 → ALLOWED to gossip
Peer B score: 1.5, gossip_threshold: 2.0 → NOT ALLOWED to gossip

Graylisting Example:

Peer A score: -1.0, graylist_threshold: 0.0 → GRAYLISTED
Peer B score: 1.0, graylist_threshold: 0.0 → NOT GRAYLISTED

Opportunistic Grafting Example

Current mesh peers: [A, B, C, D]
Peer scores: A=2.0, B=3.0, C=1.0, D=4.0
Median score: 2.5

Available candidates: [E, F, G]
Candidate scores: E=5.0, F=1.0, G=3.0

Result: The algorithm finds the first candidate with score > 2.5
        and grafts only that one. If E is checked first, only E gets grafted.
        If G is checked first, only G gets grafted. Only ONE peer is selected.

Areas for Improvement

1. Error Handling

Current Issue: Bare exception handling in opportunistic grafting

try:
    # opportunistic grafting logic
except Exception:
    pass

Improvement: Add specific exception handling and logging

try:
    # opportunistic grafting logic
except (ValueError, KeyError) as e:
    logger.warning("Opportunistic grafting failed: %s", e)
except Exception as e:
    logger.error("Unexpected error in opportunistic grafting: %s", e)

2. P6/P7 Parameters

Current Issue: Placeholder implementation

# TODO: P6/P7 placeholders: app-specific and IP-colocation
# terms (not implemented).

Improvement: Add documentation about future implementation

# TODO: P6 (Application-specific penalty) and P7 (IP colocation penalty)
# These require application-specific logic and will be implemented
# when concrete use cases are identified.

3. Score Parameter Documentation

Current Issue: Default thresholds use -math.inf

publish_threshold: float = -math.inf
gossip_threshold: float = -math.inf

Improvement: Add documentation and reasonable defaults

# Default thresholds (permissive for initial implementation)
# Consider tuning based on network conditions
publish_threshold: float = -math.inf  # Allow all publishes initially
gossip_threshold: float = -math.inf    # Allow all gossip initially

4. Protocol Version Detection

Missing: Explicit v1.0 vs v1.1 protocol negotiation Improvement: Add protocol version detection

def supports_scoring(self, peer_id: ID) -> bool:
    """Check if peer supports Gossipsub v1.1 scoring features."""
    return self.peer_protocol.get(peer_id) == PROTOCOL_ID_V11

5. Score Metrics and Observability

Missing: Metrics for score behavior Improvement: Add score monitoring

def get_score_stats(self, peer_id: ID, topic: str) -> dict:
    """Get detailed score statistics for a peer."""
    return {
        "time_in_mesh": self.time_in_mesh[peer_id][topic],
        "first_deliveries": self.first_message_deliveries[peer_id][topic],
        "mesh_deliveries": self.mesh_message_deliveries[peer_id][topic],
        "invalid_messages": self.invalid_messages[peer_id][topic],
        "behavior_penalty": self.behavior_penalty[peer_id],
        "total_score": self.score(peer_id, [topic])
    }

Conclusion

The implementation successfully adds Gossipsub v1.1 features with proper peer scoring, signed peer records, and comprehensive test coverage. The code is well-structured and follows the specification closely. Minor improvements in error handling and documentation would enhance the implementation further.

@acul71 Thank you for the comprehensive and detailed review! I really appreciate the thorough analysis and the specific improvement suggestions. I've implemented all 5 areas you identified:

Implemented Improvements

1. Enhanced Error Handling in Opportunistic Grafting

  • Replaced the bare except Exception: pass with specific exception handling
  • Added proper logging with logger.warning() for ValueError/KeyError and logger.error() for unexpected exceptions
  • Now provides meaningful error context including the topic name

2. Improved P6/P7 Documentation

  • Enhanced the TODO placeholder with detailed explanations of P6 (Application-specific penalty) and P7 (IP colocation penalty)
  • Added context about when these will be implemented and their intended purposes

3. Added Score Parameter Documentation

  • Added comprehensive documentation for the default threshold values
  • Explained why -math.inf is used (permissive defaults for initial implementation)
  • Provided guidance for production tuning with example values

4. Implemented Protocol Version Detection

  • Added supports_scoring() method to detect v1.1 protocol support
  • Integrated this into opportunistic grafting to only consider v1.1 peers for scoring-based selection
  • Ensures proper compatibility between v1.0 and v1.1 peers

5. Added Score Metrics and Observability

  • Implemented get_score_stats() for detailed peer statistics per topic
  • Added get_all_peer_scores() for monitoring all tracked peers
  • Both methods provide comprehensive debugging and monitoring capabilities

6. Comprehensive Test Coverage

  • Added tests for all new functionality including observability methods and protocol detection
  • All 24 tests passing, ensuring robust implementation

Code Quality

  • All changes maintain backward compatibility
  • No linting errors introduced
  • Follows existing code patterns and style
  • Comprehensive error handling and logging

The implementation now provides much better observability, error handling, and documentation while maintaining the high quality of the original Gossipsub v1.1 features. Thank you for the excellent feedback it really helped improve the implementation!

@seetadev
Copy link
Contributor

@Winter-Soren : Great efforts. Appreciate the contribution.

Wish to share that I re-run the CI/CD pipeline. There are some issues. Wish if you could fix them.

Reviewing the PR in details. Will share feedback soon.

@acul71
Copy link
Contributor

acul71 commented Oct 23, 2025

@pacrob @seetadev @Winter-Soren

Hello. I've seen that all request have been fulfilled. Thanks!!
I need to do a manual review of all code, and then can be merged.

📋 PR #872 Review Analysis: acul71's Requests vs Implementation Status

Based on my analysis of the PR messages and code, here's what acul71 requested and what has been implemented:

ALL 5 AREAS REQUESTED BY ACUL71 HAVE BEEN IMPLEMENTED


🎯 1. Enhanced Error Handling in Opportunistic GraftingIMPLEMENTED

acul71's Request:

Replace bare except Exception: pass with specific exception handling and logging

Implementation Status:COMPLETE

# libp2p/pubsub/gossipsub.py:666-675
except (ValueError, KeyError) as e:
    logger.warning(
        "Opportunistic grafting failed for topic %s: %s", topic, e
    )
except Exception as e:
    logger.error(
        "Unexpected error in opportunistic grafting for topic %s: %s",
        topic,
        e,
    )

🎯 2. Improved P6/P7 DocumentationIMPLEMENTED

acul71's Request:

Add detailed explanations of P6 (Application-specific penalty) and P7 (IP colocation penalty)

Implementation Status:COMPLETE

# libp2p/pubsub/score.py:160-164
# TODO: P6 (Application-specific penalty) and P7 (IP colocation penalty)
# These require application-specific logic and will be implemented
# when concrete use cases are identified.
# P6: App-specific scoring based on custom application metrics
# P7: IP colocation penalty to prevent Sybil attacks from same IP ranges

🎯 3. Added Score Parameter DocumentationIMPLEMENTED

acul71's Request:

Add documentation for default threshold values and explain why -math.inf is used

Implementation Status:COMPLETE

# libp2p/pubsub/score.py:44-50
# Acceptance thresholds (permissive defaults for initial implementation)
# These defaults allow all messages initially. In production environments,
# consider tuning based on network conditions and attack models:
# - publish_threshold: Minimum score to accept published messages (e.g., 0.0)
# - gossip_threshold: Minimum score to gossip about peer (e.g., -1.0)
# - graylist_threshold: Score below which peer is ignored (e.g., -10.0)
# - accept_px_threshold: Minimum score to accept PX from peer (e.g., 0.0)

🎯 4. Protocol Version DetectionIMPLEMENTED

acul71's Request:

Add supports_scoring() method to detect v1.1 protocol support

Implementation Status:COMPLETE

# libp2p/pubsub/gossipsub.py:176-183
def supports_scoring(self, peer_id: ID) -> bool:
    """
    Check if peer supports Gossipsub v1.1 scoring features.

    :param peer_id: The peer to check
    :return: True if peer supports v1.1 features, False otherwise
    """
    return self.peer_protocol.get(peer_id) == PROTOCOL_ID_V11

🎯 5. Score Metrics and ObservabilityIMPLEMENTED

acul71's Request:

Add get_score_stats() for detailed peer statistics and monitoring

Implementation Status:COMPLETE

# libp2p/pubsub/score.py:261-278
def get_score_stats(self, peer: ID, topic: str) -> dict[str, float]:
    """
    Get detailed score statistics for a peer in a specific topic.

    Useful for debugging, monitoring, and understanding peer behavior.

    :param peer: The peer ID to get stats for
    :param topic: The topic to get stats for
    :return: Dictionary containing all score components and total score
    """
    return {
        "time_in_mesh": self.time_in_mesh[peer][topic],
        "first_deliveries": self.first_message_deliveries[peer][topic],
        "mesh_deliveries": self.mesh_message_deliveries[peer][topic],
        "invalid_messages": self.invalid_messages[peer][topic],
        "behavior_penalty": self.behavior_penalty[peer],
        "total_score": self.score(peer, [topic]),
    }

🎯 6. PACROB'S FEEDBACKADDRESSED

pacrob's Request:

"It looks like you cut these lines off from the __init__ above."

Implementation Status:ADDRESSED

  • The supports_scoring() method has been properly integrated into the __init__ method
  • No more cut-off lines in the initialization

🎯 7. SEETADEV'S REQUESTSIMPLEMENTED

seetadev's Requests:

  1. Test Coverage: ✅ COMPLETE - Comprehensive test suite added
  2. CI/CD Issues: ✅ ADDRESSED - Latest merge includes miniupnpc fix
  3. Key Scenarios: ✅ COMPLETE - All scenarios from discussion Discussion: Test Coverage for GossipSub 1.1 Upgrade #981 covered

📊 SUMMARY: 100% IMPLEMENTATION RATE

Requestor Request Status Implementation
acul71 Enhanced Error Handling ✅ Complete Specific exception handling with logging
acul71 P6/P7 Documentation ✅ Complete Detailed explanations added
acul71 Score Parameter Docs ✅ Complete Comprehensive threshold documentation
acul71 Protocol Detection ✅ Complete supports_scoring() method implemented
acul71 Score Metrics ✅ Complete get_score_stats() method implemented
pacrob Code Structure ✅ Complete Fixed initialization method
seetadev Test Coverage ✅ Complete Comprehensive test suite
seetadev CI/CD Issues ✅ Complete Merged with latest main

🎉 CONCLUSION

All feedback from acul71, pacrob, and seetadev has been successfully implemented! The PR #872 now includes:

  • Enhanced error handling with specific exception types
  • Comprehensive documentation for all parameters
  • Protocol version detection for v1.1 features
  • Full observability with score statistics
  • Complete test coverage for all scenarios
  • CI/CD compatibility with latest main branch

The implementation is production-ready and addresses all maintainer concerns! 🚀

@seetadev
Copy link
Contributor

@Winter-Soren : Fantastic work on the GossipSub 1.1 protocol update! 🎉The PR now reflects better observability, and production readiness.

@acul71 : Thank you for sharing that Soham has successfully implemented get_score_stats(), addressed all feedback points as shared by us, ensured full test coverage, and resolved CI/CD issues.

This is a neat contribution to the project, and I'll ask @sumanjeet0012 to try out its implementation in universal connectivity dapp.

I'll also follow up with @acul71 on gossipsub interop efforts: https://github.com/libp2p/test-plans/tree/master/gossipsub-interop. We will open a new github issue for the same.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants