Skip to content

Commit e39b2e2

Browse files
authored
fix: remove abortable iterator (#488)
AbortableSource is slow because it races promises against every chunk causing extra async work. It's only really necessary if we're going to pass the source off to another component. Here we don't do that so it's simpler to just add a listener for the abort event and close the stream.
1 parent b4e6a8d commit e39b2e2

File tree

2 files changed

+18
-11
lines changed

2 files changed

+18
-11
lines changed

package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@
7878
"@libp2p/peer-id": "^4.0.5",
7979
"@libp2p/pubsub": "^9.0.8",
8080
"@multiformats/multiaddr": "^12.1.14",
81-
"abortable-iterator": "^5.0.1",
8281
"denque": "^2.1.0",
8382
"it-length-prefixed": "^9.0.4",
8483
"it-pipe": "^3.0.1",

src/stream.ts

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import { abortableSource } from 'abortable-iterator'
21
import { encode, decode } from 'it-length-prefixed'
32
import { pipe } from 'it-pipe'
43
import { pushable, type Pushable } from 'it-pushable'
@@ -25,8 +24,15 @@ export class OutboundStream {
2524
this.closeController = new AbortController()
2625
this.maxBufferSize = opts.maxBufferSize ?? Infinity
2726

27+
this.closeController.signal.addEventListener('abort', () => {
28+
rawStream.close()
29+
.catch(err => {
30+
rawStream.abort(err)
31+
})
32+
})
33+
2834
pipe(
29-
abortableSource(this.pushable, this.closeController.signal, { returnOnAbort: true }),
35+
this.pushable,
3036
this.rawStream
3137
).catch(errCallback)
3238
}
@@ -59,7 +65,6 @@ export class OutboundStream {
5965
this.closeController.abort()
6066
// similar to pushable.end() but clear the internal buffer
6167
await this.pushable.return()
62-
await this.rawStream.close()
6368
}
6469
}
6570

@@ -73,17 +78,20 @@ export class InboundStream {
7378
this.rawStream = rawStream
7479
this.closeController = new AbortController()
7580

76-
this.source = abortableSource(
77-
pipe(this.rawStream, (source) => decode(source, opts)),
78-
this.closeController.signal,
79-
{
80-
returnOnAbort: true
81-
}
81+
this.closeController.signal.addEventListener('abort', () => {
82+
rawStream.close()
83+
.catch(err => {
84+
rawStream.abort(err)
85+
})
86+
})
87+
88+
this.source = pipe(
89+
this.rawStream,
90+
(source) => decode(source, opts)
8291
)
8392
}
8493

8594
async close (): Promise<void> {
8695
this.closeController.abort()
87-
await this.rawStream.close()
8896
}
8997
}

0 commit comments

Comments
 (0)