Skip to content

Commit 6b5ff4d

Browse files
authored
feat: allow only defined list of topics (#348)
* Allow only defined list of topics * Allow allowedTopics unit test
1 parent 27bdee7 commit 6b5ff4d

File tree

2 files changed

+94
-21
lines changed

2 files changed

+94
-21
lines changed

src/index.ts

Lines changed: 37 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,11 @@ export interface GossipsubOpts extends GossipsubOptsSpec, PubSubInit {
157157
* If full it will throw and reject sending any more data.
158158
*/
159159
maxOutboundBufferSize?: number
160+
161+
/**
162+
* If provided, only allow topics in this list
163+
*/
164+
allowedTopics?: string[] | Set<string>
160165
}
161166

162167
export interface GossipsubMessage {
@@ -339,6 +344,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
339344
private status: GossipStatus = { code: GossipStatusCode.stopped }
340345
private maxInboundStreams?: number
341346
private maxOutboundStreams?: number
347+
private allowedTopics: Set<TopicStr> | null
342348

343349
private heartbeatTimer: {
344350
_intervalId: ReturnType<typeof setInterval> | undefined
@@ -462,6 +468,8 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
462468

463469
this.maxInboundStreams = options.maxInboundStreams
464470
this.maxOutboundStreams = options.maxOutboundStreams
471+
472+
this.allowedTopics = opts.allowedTopics ? new Set(opts.allowedTopics) : null
465473
}
466474

467475
getPeers(): PeerId[] {
@@ -918,23 +926,29 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
918926
// Handle received subscriptions
919927
if (rpc.subscriptions && rpc.subscriptions.length > 0) {
920928
// update peer subscriptions
929+
930+
const subscriptions: { topic: TopicStr; subscribe: boolean }[] = []
931+
921932
rpc.subscriptions.forEach((subOpt) => {
922-
this.handleReceivedSubscription(from, subOpt)
933+
const topic = subOpt.topic
934+
const subscribe = subOpt.subscribe === true
935+
936+
if (topic != null) {
937+
if (this.allowedTopics && !this.allowedTopics.has(topic)) {
938+
// Not allowed: subscription data-structures are not bounded by topic count
939+
// TODO: Should apply behaviour penalties?
940+
return
941+
}
942+
943+
this.handleReceivedSubscription(from, topic, subscribe)
944+
945+
subscriptions.push({ topic, subscribe })
946+
}
923947
})
924948

925949
this.dispatchEvent(
926950
new CustomEvent<SubscriptionChangeData>('subscription-change', {
927-
detail: {
928-
peerId: from,
929-
subscriptions: rpc.subscriptions
930-
.filter((sub) => sub.topic !== null)
931-
.map((sub) => {
932-
return {
933-
topic: sub.topic ?? '',
934-
subscribe: Boolean(sub.subscribe)
935-
}
936-
})
937-
}
951+
detail: { peerId: from, subscriptions }
938952
})
939953
)
940954
}
@@ -943,6 +957,12 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
943957
// TODO: (up to limit)
944958
if (rpc.messages) {
945959
for (const message of rpc.messages) {
960+
if (this.allowedTopics && !this.allowedTopics.has(message.topic)) {
961+
// Not allowed: message cache data-structures are not bounded by topic count
962+
// TODO: Should apply behaviour penalties?
963+
continue
964+
}
965+
946966
const handleReceivedMessagePromise = this.handleReceivedMessage(from, message)
947967
// Should never throw, but handle just in case
948968
.catch((err) => this.log(err))
@@ -962,20 +982,16 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
962982
/**
963983
* Handles a subscription change from a peer
964984
*/
965-
private handleReceivedSubscription(from: PeerId, subOpt: RPC.ISubOpts): void {
966-
if (subOpt.topic == null) {
967-
return
968-
}
969-
970-
this.log('subscription update from %p topic %s', from, subOpt.topic)
985+
private handleReceivedSubscription(from: PeerId, topic: TopicStr, subscribe: boolean): void {
986+
this.log('subscription update from %p topic %s', from, topic)
971987

972-
let topicSet = this.topics.get(subOpt.topic)
988+
let topicSet = this.topics.get(topic)
973989
if (topicSet == null) {
974990
topicSet = new Set()
975-
this.topics.set(subOpt.topic, topicSet)
991+
this.topics.set(topic, topicSet)
976992
}
977993

978-
if (subOpt.subscribe) {
994+
if (subscribe) {
979995
// subscribe peer to new topic
980996
topicSet.add(from.toString())
981997
} else {

test/allowedTopics.spec.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import { expect } from 'aegir/chai'
2+
import { GossipSub } from '../src/index.js'
3+
import { pEvent } from 'p-event'
4+
import { connectAllPubSubNodes, createComponentsArray } from './utils/create-pubsub.js'
5+
import { Components } from '@libp2p/components'
6+
import { stop } from '@libp2p/interfaces/startable'
7+
import { mockNetwork } from '@libp2p/interface-mocks'
8+
9+
/* eslint-disable dot-notation */
10+
describe('gossip / allowedTopics', () => {
11+
let nodes: Components[]
12+
13+
const allowedTopic = 'topic_allowed'
14+
const notAllowedTopic = 'topic_not_allowed'
15+
const allowedTopics = [allowedTopic]
16+
const allTopics = [allowedTopic, notAllowedTopic]
17+
18+
// Create pubsub nodes
19+
beforeEach(async () => {
20+
mockNetwork.reset()
21+
nodes = await createComponentsArray({
22+
number: 2,
23+
connected: false,
24+
init: {
25+
allowedTopics
26+
}
27+
})
28+
})
29+
30+
afterEach(async () => {
31+
await stop(...nodes)
32+
mockNetwork.reset()
33+
})
34+
35+
it('should send gossip to non-mesh peers in topic', async function () {
36+
this.timeout(10 * 1000)
37+
const [nodeA, nodeB] = nodes
38+
39+
// add subscriptions to each node
40+
for (const topic of allTopics) {
41+
nodeA.getPubSub().subscribe(topic)
42+
}
43+
44+
// every node connected to every other
45+
await Promise.all([
46+
connectAllPubSubNodes(nodes),
47+
// nodeA should send nodeB all its subscriptions on connection
48+
pEvent(nodeB.getPubSub(), 'subscription-change')
49+
])
50+
51+
const nodeASubscriptions = Array.from((nodeA.getPubSub() as GossipSub)['subscriptions'].keys())
52+
expect(nodeASubscriptions).deep.equals(allTopics, 'nodeA.subscriptions should be subcribed to all')
53+
54+
const nodeBTopics = Array.from((nodeB.getPubSub() as GossipSub)['topics'].keys())
55+
expect(nodeBTopics).deep.equals(allowedTopics, 'nodeB.topics should only contain allowedTopics')
56+
})
57+
})

0 commit comments

Comments
 (0)