Skip to content

Commit e281862

Browse files
committed
feat(node): implement PipeWrap for Windows
1 parent 8e5462c commit e281862

File tree

4 files changed

+77
-40
lines changed

4 files changed

+77
-40
lines changed

ext/net/04_pipe.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ async function write(rid, data) {
5959
}
6060

6161
class Pipe {
62-
#rid = 0;
62+
readonly #rid = 0;
6363
#unref = false;
6464
#pendingReadPromises = new SafeSet();
6565

@@ -76,11 +76,11 @@ class Pipe {
7676
this.#rid = rid;
7777
}
7878

79-
write(buffer) {
79+
write(buffer): Promise<number> {
8080
return write(this.#rid, buffer);
8181
}
8282

83-
async read(buffer) {
83+
async read(buffer): Promise<number> {
8484
if (buffer.length === 0) {
8585
return 0;
8686
}
@@ -101,7 +101,7 @@ class Pipe {
101101
close() {
102102
core.close(this.#rid);
103103
}
104-
get readable() {
104+
get readable(): ReadableStream<Uint8Array> {
105105
if (this.#readable === undefined) {
106106
this.#readable = readableStreamForRidUnrefable(this.#rid);
107107
if (this.#unref) {
@@ -111,7 +111,7 @@ class Pipe {
111111
return this.#readable;
112112
}
113113

114-
get writable() {
114+
get writable(): WritableStream<Uint8Array> {
115115
if (this.#writable === undefined) {
116116
this.#writable = writableStreamForRid(this.#rid);
117117
}
@@ -146,14 +146,14 @@ class Pipe {
146146
}
147147
}
148148

149-
function connect(opts: Options | WindowsConnectOptions) {
149+
async function connect(opts: Options | WindowsConnectOptions) {
150150
let rid: number;
151151
switch (opts.kind) {
152152
case "unix":
153153
rid = op_pipe_connect(opts.path, "Deno.pipe.connect");
154154
return new Pipe(rid);
155155
case "windows":
156-
rid = op_pipe_connect(opts, "Deno.pipe.connect");
156+
rid = await op_pipe_connect(opts, "Deno.pipe.connect");
157157
return new Pipe(rid);
158158
default:
159159
throw new Error(`Unsupported kind: ${opts.kind}`);

ext/net/ops_win_pipe.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ where
102102

103103
#[op2(stack_trace)]
104104
#[smi]
105-
pub fn op_pipe_connect<NP>(
105+
pub async fn op_pipe_connect<NP>(
106106
state: &mut OpState,
107107
#[serde] args: ConnectArgs,
108108
#[string] api_name: &str,
@@ -132,10 +132,9 @@ where
132132
};
133133

134134
let mut opts = named_pipe::ClientOptions::new();
135-
opts
136-
.read(args.read)
137-
.write(args.write);
135+
opts.read(args.read).write(args.write);
138136
let pipe = NamedPipe::new_client(path.as_ref(), &opts)?;
137+
pipe.connect().await?;
139138
let rid = state.resource_table.add(pipe);
140139
Ok(rid)
141140
}

ext/net/win_pipe.rs

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@
33
use std::borrow::Cow;
44
use std::ffi::OsStr;
55
use std::io;
6-
use std::path::Path;
76
use std::rc::Rc;
87

9-
use deno_core::AsyncRef;
108
use deno_core::AsyncRefCell;
119
use deno_core::AsyncResult;
1210
use deno_core::CancelHandle;
@@ -23,19 +21,18 @@ pub struct NamedPipe {
2321
}
2422

2523
enum Inner {
26-
Server((bool, named_pipe::NamedPipeServer)),
24+
Server(named_pipe::NamedPipeServer),
2725
Client(named_pipe::NamedPipeClient),
2826
}
2927

3028
impl NamedPipe {
31-
pub fn new_server(
29+
pub async fn new_server(
3230
addr: impl AsRef<OsStr>,
3331
options: &named_pipe::ServerOptions,
3432
) -> io::Result<NamedPipe> {
3533
let server = options.create(addr)?;
36-
// todo: should we connect here
3734
Ok(NamedPipe {
38-
inner: AsyncRefCell::new(Inner::Server((false, server))),
35+
inner: AsyncRefCell::new(Inner::Server(server)),
3936
cancel: Default::default(),
4037
})
4138
}
@@ -51,17 +48,20 @@ impl NamedPipe {
5148
})
5249
}
5350

51+
pub async fn connect(&self) -> io::Result<()> {
52+
match &self.inner {
53+
Inner::Server(ref inner) => {
54+
inner.connect().try_or_cancel(&self.cancel).await
55+
}
56+
Inner::Client(ref inner) => Ok(()),
57+
}
58+
}
59+
5460
pub async fn write(self: Rc<Self>, buf: &[u8]) -> io::Result<usize> {
5561
let mut inner = RcRef::map(&self, |s| &s.inner).borrow_mut().await;
5662
let cancel = RcRef::map(&self, |s| &s.cancel);
5763
match &mut *inner {
58-
Inner::Server((mut connected, server)) => {
59-
if !connected {
60-
server.connect().await?;
61-
connected = true;
62-
}
63-
server.write(buf).try_or_cancel(cancel).await
64-
},
64+
Inner::Server(server) => server.write(buf).try_or_cancel(cancel).await,
6565
Inner::Client(client) => client.write(buf).try_or_cancel(cancel).await,
6666
}
6767
}
@@ -70,13 +70,7 @@ impl NamedPipe {
7070
let mut inner = RcRef::map(&self, |s| &s.inner).borrow_mut().await;
7171
let cancel = RcRef::map(&self, |s| &s.cancel);
7272
match &mut *inner {
73-
Inner::Server((mut connected, server)) => {
74-
if !connected {
75-
server.connect().await?;
76-
connected = true;
77-
}
78-
server.read(buf).try_or_cancel(cancel).await
79-
},
73+
Inner::Server(server) => server.read(buf).try_or_cancel(cancel).await,
8074
Inner::Client(client) => client.read(buf).try_or_cancel(cancel).await,
8175
}
8276
}

ext/node/polyfills/internal_binding/pipe_wrap.ts

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ export class Pipe extends ConnectionWrap {
5959
override reading = false;
6060
ipc: boolean;
6161

62-
// REF: https://github.com/nodejs/node/blob/master/deps/uv/src/win/pipe.c#L48
62+
// REF: https://github.com/nodejs/node/blob/main/deps/uv/src/win/pipe.c#L48
6363
#pendingInstances = 4;
6464

6565
#address?: string;
@@ -71,7 +71,10 @@ export class Pipe extends ConnectionWrap {
7171
#closed = false;
7272
#acceptBackoffDelay?: number;
7373

74-
constructor(type: number, conn?: Deno.UnixConn | StreamBase) {
74+
constructor(
75+
type: number,
76+
conn?: Deno.UnixConn | Deno.PipeNs.Pipe | StreamBase,
77+
) {
7578
let provider: providerType;
7679
let ipc: boolean;
7780

@@ -137,8 +140,30 @@ export class Pipe extends ConnectionWrap {
137140
*/
138141
connect(req: PipeConnectWrap, address: string) {
139142
if (isWindows) {
140-
// REF: https://github.com/denoland/deno/issues/10244
141-
notImplemented("Pipe.prototype.connect - Windows");
143+
try {
144+
this[kStreamBaseField] = Deno.pipe.connect({
145+
path: address,
146+
kind: "windows",
147+
});
148+
this.#address = req.address;
149+
150+
try {
151+
this.afterConnect(req, 0);
152+
} catch {
153+
// swallow callback errors.
154+
}
155+
} catch (e) {
156+
const code = codeMap.get(e.code ?? "UNKNOWN") ??
157+
codeMap.get("UNKNOWN")!;
158+
159+
try {
160+
this.afterConnect(req, code);
161+
} catch {
162+
// swallow callback errors.
163+
}
164+
}
165+
166+
return 0;
142167
}
143168

144169
const connectOptions: Deno.UnixConnectOptions = {
@@ -180,15 +205,30 @@ export class Pipe extends ConnectionWrap {
180205
* @return An error status code.
181206
*/
182207
listen(backlog: number): number {
183-
if (isWindows) {
184-
// REF: https://github.com/denoland/deno/issues/10244
185-
notImplemented("Pipe.prototype.listen - Windows");
186-
}
187-
188208
this.#backlog = isWindows
189209
? this.#pendingInstances
190210
: ceilPowOf2(backlog + 1);
191211

212+
if (isWindows) {
213+
let listener;
214+
try {
215+
listener = Deno.pipe.listen({
216+
path: this.#address!,
217+
kind: "windows",
218+
pipeMode: "byte",
219+
});
220+
} catch (e) {
221+
if (e instanceof Deno.errors.NotCapable) {
222+
throw e;
223+
}
224+
return codeMap.get(e.code ?? "UNKNOWN") ?? codeMap.get("UNKNOWN")!;
225+
}
226+
227+
this.#listener = listener;
228+
229+
return 0;
230+
}
231+
192232
const listenOptions = {
193233
path: this.#address!,
194234
transport: "unix" as const,
@@ -299,6 +339,10 @@ export class Pipe extends ConnectionWrap {
299339
return;
300340
}
301341

342+
if (isWindows) {
343+
return;
344+
}
345+
302346
if (this.#connections > this.#backlog!) {
303347
this.#acceptBackoff();
304348

0 commit comments

Comments
 (0)