@@ -10,7 +10,7 @@ import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
10
10
import { MessageCache } from './message-cache.js'
11
11
import { RPC , IRPC } from './message/rpc.js'
12
12
import * as constants from './constants.js'
13
- import { createGossipRpc , shuffle , messageIdToString } from './utils/index.js'
13
+ import { shuffle , messageIdToString } from './utils/index.js'
14
14
import {
15
15
PeerScore ,
16
16
PeerScoreParams ,
@@ -1150,8 +1150,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
1150
1150
*/
1151
1151
private sendSubscriptions ( toPeer : PeerIdStr , topics : string [ ] , subscribe : boolean ) : void {
1152
1152
this . sendRpc ( toPeer , {
1153
- subscriptions : topics . map ( ( topic ) => ( { topic, subscribe } ) ) ,
1154
- messages : [ ]
1153
+ subscriptions : topics . map ( ( topic ) => ( { topic, subscribe } ) )
1155
1154
} )
1156
1155
}
1157
1156
@@ -1172,7 +1171,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
1172
1171
return
1173
1172
}
1174
1173
1175
- this . sendRpc ( id , createGossipRpc ( ihave , { iwant, prune } ) )
1174
+ this . sendRpc ( id , { messages : ihave , control : { iwant, prune } } )
1176
1175
}
1177
1176
1178
1177
/**
@@ -1915,10 +1914,9 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
1915
1914
// Note: Don't throw if tosend is empty, we can have a mesh with a single peer
1916
1915
1917
1916
// forward the message to peers
1918
- const rpc = createGossipRpc ( [ rawMsg ] )
1919
1917
tosend . forEach ( ( id ) => {
1920
- // self.send_message(*peer_id, event.clone())?;
1921
- this . sendRpc ( id , rpc )
1918
+ // sendRpc may mutate RPC message on piggyback, create a new message for each peer
1919
+ this . sendRpc ( id , { messages : [ rawMsg ] } )
1922
1920
} )
1923
1921
1924
1922
this . metrics ?. onForwardMsg ( rawMsg . topic , tosend . size )
@@ -1967,11 +1965,9 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
1967
1965
this . publishedMessageIds . put ( msgIdStr )
1968
1966
1969
1967
// Send to set of peers aggregated from direct, mesh, fanout
1970
- const rpc = createGossipRpc ( [ rawMsg ] )
1971
-
1972
1968
for ( const id of tosend ) {
1973
- // self.send_message(*peer_id, event.clone())?;
1974
- const sent = this . sendRpc ( id , rpc )
1969
+ // sendRpc may mutate RPC message on piggyback, create a new message for each peer
1970
+ const sent = this . sendRpc ( id , { messages : [ rawMsg ] } )
1975
1971
1976
1972
// did not actually send the message
1977
1973
if ( ! sent ) {
@@ -2072,8 +2068,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
2072
2068
}
2073
2069
]
2074
2070
2075
- const out = createGossipRpc ( [ ] , { graft } )
2076
- this . sendRpc ( id , out )
2071
+ this . sendRpc ( id , { control : { graft } } )
2077
2072
}
2078
2073
2079
2074
/**
@@ -2082,8 +2077,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
2082
2077
private async sendPrune ( id : PeerIdStr , topic : string ) : Promise < void > {
2083
2078
const prune = [ await this . makePrune ( id , topic , this . opts . doPX ) ]
2084
2079
2085
- const out = createGossipRpc ( [ ] , { prune } )
2086
- this . sendRpc ( id , out )
2080
+ this . sendRpc ( id , { control : { prune } } )
2087
2081
}
2088
2082
2089
2083
/**
@@ -2132,30 +2126,32 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
2132
2126
return true
2133
2127
}
2134
2128
2129
+ /** Mutates `outRpc` adding graft and prune control messages */
2135
2130
public piggybackControl ( id : PeerIdStr , outRpc : IRPC , ctrl : RPC . IControlMessage ) : void {
2136
- const tograft = ( ctrl . graft || [ ] ) . filter ( ( { topicID } ) =>
2137
- ( ( topicID && this . mesh . get ( topicID ) ) || new Set ( ) ) . has ( id )
2138
- )
2139
- const toprune = ( ctrl . prune || [ ] ) . filter (
2140
- ( { topicID } ) => ! ( ( topicID && this . mesh . get ( topicID ) ) || new Set ( ) ) . has ( id )
2141
- )
2142
-
2143
- if ( ! tograft . length && ! toprune . length ) {
2144
- return
2131
+ if ( ctrl . graft ) {
2132
+ if ( ! outRpc . control ) outRpc . control = { }
2133
+ if ( ! outRpc . control . graft ) outRpc . control . graft = [ ]
2134
+ for ( const graft of ctrl . graft ) {
2135
+ if ( graft . topicID && this . mesh . get ( graft . topicID ) ?. has ( id ) ) {
2136
+ outRpc . control . graft . push ( graft )
2137
+ }
2138
+ }
2145
2139
}
2146
2140
2147
- if ( outRpc . control ) {
2148
- outRpc . control . graft = outRpc . control . graft && outRpc . control . graft . concat ( tograft )
2149
- outRpc . control . prune = outRpc . control . prune && outRpc . control . prune . concat ( toprune )
2150
- } else {
2151
- outRpc . control = { graft : tograft , prune : toprune , ihave : [ ] , iwant : [ ] }
2141
+ if ( ctrl . prune ) {
2142
+ if ( ! outRpc . control ) outRpc . control = { }
2143
+ if ( ! outRpc . control . prune ) outRpc . control . prune = [ ]
2144
+ for ( const prune of ctrl . prune ) {
2145
+ if ( prune . topicID && this . mesh . get ( prune . topicID ) ?. has ( id ) ) {
2146
+ outRpc . control . prune . push ( prune )
2147
+ }
2148
+ }
2152
2149
}
2153
2150
}
2154
2151
2152
+ /** Mutates `outRpc` adding ihave control messages */
2155
2153
private piggybackGossip ( id : PeerIdStr , outRpc : IRPC , ihave : RPC . IControlIHave [ ] ) : void {
2156
- if ( ! outRpc . control ) {
2157
- outRpc . control = { ihave : [ ] , iwant : [ ] , graft : [ ] , prune : [ ] }
2158
- }
2154
+ if ( ! outRpc . control ) outRpc . control = { }
2159
2155
outRpc . control . ihave = ihave
2160
2156
}
2161
2157
@@ -2183,15 +2179,13 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
2183
2179
toprune . delete ( id )
2184
2180
}
2185
2181
2186
- const outRpc = createGossipRpc ( [ ] , { graft, prune } )
2187
- this . sendRpc ( id , outRpc )
2182
+ this . sendRpc ( id , { control : { graft, prune } } )
2188
2183
}
2189
2184
for ( const [ id , topics ] of toprune ) {
2190
2185
const prune = await Promise . all (
2191
2186
topics . map ( async ( topicID ) => await this . makePrune ( id , topicID , doPX && ! ( noPX . get ( id ) ?? false ) ) )
2192
2187
)
2193
- const outRpc = createGossipRpc ( [ ] , { prune } )
2194
- this . sendRpc ( id , outRpc )
2188
+ this . sendRpc ( id , { control : { prune } } )
2195
2189
}
2196
2190
}
2197
2191
@@ -2264,12 +2258,12 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
2264
2258
// send gossip first, which will also piggyback control
2265
2259
for ( const [ peer , ihave ] of this . gossip . entries ( ) ) {
2266
2260
this . gossip . delete ( peer )
2267
- this . sendRpc ( peer , createGossipRpc ( [ ] , { ihave } ) )
2261
+ this . sendRpc ( peer , { control : { ihave } } )
2268
2262
}
2269
2263
// send the remaining control messages
2270
2264
for ( const [ peer , control ] of this . control . entries ( ) ) {
2271
2265
this . control . delete ( peer )
2272
- this . sendRpc ( peer , createGossipRpc ( [ ] , { graft : control . graft , prune : control . prune } ) )
2266
+ this . sendRpc ( peer , { control : { graft : control . graft , prune : control . prune } } )
2273
2267
}
2274
2268
}
2275
2269
0 commit comments