From 6d90c42e43f2370fcb375b7978834a4339062c16 Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Fri, 10 Oct 2025 10:08:52 -0700 Subject: [PATCH] Marco's review Signed-off-by: Marco Munizaga --- protocols/gossipsub/src/behaviour.rs | 34 +++++++++++++++++++--- protocols/gossipsub/src/behaviour/tests.rs | 2 ++ protocols/gossipsub/src/partial.rs | 3 ++ 3 files changed, 35 insertions(+), 4 deletions(-) diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index e7a5d3ec6..86b01ac7c 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -18,6 +18,14 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +// Some higher level notes: +// +// Is having the cfg macro everywhere for partial messages the best approach? +// In Go, we have a separate package for partial messages and define an interface for how it +// interacts with the rest of the gossipsub system. +// +// Also seems like this PR is missing a test to test Partial Messages. + use std::{ cmp::{ max, @@ -659,6 +667,7 @@ where let topic_hash = raw_message.topic.clone(); + // Shouldn't this be filter_partial=false? let recipient_peers = self.get_publish_peers(&topic_hash, true); // If the message isn't a duplicate and we have sent it to some peers add it to the @@ -731,8 +740,9 @@ where .filter(|(_, peer)| { #[cfg(feature = "partial_messages")] { - if filter_partial && peer.partial_only_topics.contains(topic_hash) { - return false; + // This condition doesn't seem right, suggestion: + if filter_partial { + return peer.partial_only_topics.contains(topic_hash); } } let _ = peer; @@ -855,6 +865,7 @@ where let group_id = partial_message.group_id().as_ref().to_vec(); + // Shouldn't this be filter_partial=true? let recipient_peers = self.get_publish_peers(&topic_id, false); let metadata = partial_message.parts_metadata().as_ref().to_vec(); for peer_id in recipient_peers.iter() { @@ -871,6 +882,7 @@ where let Ok(action) = partial_message.partial_message_bytes_from_metadata(peer_partial.metadata.as_ref()) else { + // Note, you probably want some way to penalize this peer if they've sent you some invalid metadata. tracing::error!(peer = %peer_id, group_id = ?group_id, "Could not reconstruct message bytes for peer metadata"); peer_partials.remove(&group_id); @@ -880,11 +892,19 @@ where let message = match action { PublishAction::NothingToSend => { // No new data to send peer. + // + // You probably still want to send them your metadata in case they have things you want, or in case you have things they want (in the case you haven't received their metadata yet) continue; } PublishAction::PeerHasAllData => { // Peer partial is now complete // remove it from the list + // I'm not sure you want to remove this. You want to keep track of the peer's metadata so that you can make this function idempotent. + // + // For example: + // say you've just sent the peer the rest of the parts, and now they have a full message. + // If you remove their state, the next time you call publish_partial you will possible send them more data (depending on the implementation of `partial_message_bytes_from_metadata`) + // and you'll send them your metadata even if you already have a full message. peer_partials.remove(&group_id); continue; } @@ -900,6 +920,11 @@ where } }; + // You should avoid sending duplicate messages. If you've already sent this peer some + // metadata, you should avoid later sending them another duplicate message with the same + // metadata. + // + // In other words this function should be idempotent. self.send_message( *peer_id, RpcOut::PartialMessage { @@ -2760,7 +2785,8 @@ where for topics in peer.partial_messages.values_mut() { topics.retain(|_, partial| { partial.ttl -= 1; - partial.ttl <= 0 + // Clippy nits that because this is usize, this will never be < 0 + partial.ttl == 0 }); } } @@ -3789,7 +3815,7 @@ fn get_random_peers_dynamic( .filter_map(|(peer_id, peer)| { #[cfg(feature = "partial_messages")] { - if partial && peer.partial_only_topics.contains(topic_hash) { + if partial && !peer.partial_only_topics.contains(topic_hash) { return None; } } diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index 9d8c27679..18dd7f5e0 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -20,6 +20,8 @@ // Collection of tests for the gossipsub network behaviour +// Is this file not rustfmt'ed? + use std::{net::Ipv4Addr, thread::sleep}; use asynchronous_codec::{Decoder, Encoder}; diff --git a/protocols/gossipsub/src/partial.rs b/protocols/gossipsub/src/partial.rs index 455a811a2..42d82e5fe 100644 --- a/protocols/gossipsub/src/partial.rs +++ b/protocols/gossipsub/src/partial.rs @@ -67,6 +67,8 @@ pub trait Partial { /// /// Returns `Ok(())` if the data was successfully integrated, or `Err`, /// if the data was invalid or couldn't be processed. + // Note: In the go implementation we removed this method from the interface since the application ultimately gets the message and can do whatever it wants with it. + // This also seems unused currently. fn extend_from_encoded_partial_message( &mut self, data: &[u8], @@ -83,6 +85,7 @@ pub enum PublishAction { NothingToSend, /// We have something of interest to this peer, but can not send everything it needs. Send a /// message and associate some new metadata to the peer, representing the remaining need. + // We changed this a bit in the go implementation. We now have a separate function for merging metadata. This prevents duplicates under a certain ordering. More details here: https://github.com/libp2p/go-libp2p-pubsub/commit/eb2f04a3205d1ae8f755a5eb96b79696d1ae1d15 Send { message: Vec, metadata: Vec }, /// We can send everything this peer needs. Send message, then do not keep track of the peer /// anymore. -- 2.50.1