diff --git a/Cargo.lock b/Cargo.lock index 1744451825ba72..659f2fe9dc5264 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2245,6 +2245,7 @@ dependencies = [ "deno_tls", "hickory-proto", "hickory-resolver", + "nix 0.27.1", "pin-project", "quinn", "rustls-tokio-stream", @@ -10255,3 +10256,7 @@ checksum = "16099418600b4d8f028622f73ff6e3deaabdff330fb9a2a131dea781ee8b0768" dependencies = [ "zune-core", ] + +[[patch.unused]] +name = "deno_core" +version = "0.346.0" diff --git a/cli/tsc/dts/lib.deno_net.d.ts b/cli/tsc/dts/lib.deno_net.d.ts index a76c3dd82f3cfc..0425c20f9e8811 100644 --- a/cli/tsc/dts/lib.deno_net.d.ts +++ b/cli/tsc/dts/lib.deno_net.d.ts @@ -953,5 +953,138 @@ declare namespace Deno { conn: QuicConn, ): Promise; + /** + * @category Network + * Named pipes for Deno + */ + export namespace PipeNs { + /** @category Network */ + export interface Pipe extends Disposable { + /** Read the incoming data from the pipe into an array buffer (`p`). + * + * Resolves to either the number of bytes read during the operation or EOF + * (`null`) if there was nothing more to read. + * + * It is possible for a read to successfully return with `0` bytes. This + * does not indicate EOF. + * + * **It is not guaranteed that the full buffer will be read in a single + * call.** + * + * ```ts + * // If the text "hello world" is received by the client: + * const pipe = await Deno.pipe.listen({ path: "/tmp/my-pipe" }); + * const buf = new Uint8Array(100); + * const numberOfBytesRead = await pipe.read(buf); // 11 bytes + * const text = new TextDecoder().decode(buf); // "hello world" + * ``` + * + * @category I/O + */ + read(p: Uint8Array): Promise; + /** Write the contents of the array buffer (`p`) to the pipe. + * + * Resolves to the number of bytes written. + * + * **It is not guaranteed that the full buffer will be written in a single + * call.** + * + * ```ts + * const pipe = await Deno.pipe.connect({ path: "/tmp/my-pipe", create: true }); + * const encoder = new TextEncoder(); + * const data = encoder.encode("Hello world"); + * const bytesWritten = await pipe.write(data); // 11 + * ``` + * + * @category I/O + */ + write(p: Uint8Array): Promise; + /** Closes the pipe, freeing the resource. + * + * ```ts + * const pipe = await Deno.pipe.connect({ path: "/tmp/my-pipe" }); + * + * // ... + * + * pipe.close(); + * ``` + */ + close(): void; + + /** Make the pipe block the event loop from finishing. + * + * Note: the pipe blocks the event loop from finishing by default. + * This method is only meaningful after `.unref()` is called. + */ + ref(): void; + /** Make the pipe not block the event loop from finishing. */ + unref(): void; + + readonly readable: ReadableStream>; + readonly writable: WritableStream>; + } + + /** + * Pipe Options + * + * @category Network + */ + export interface Options { + path: string; + kind: Kind; + } + + /** @category Network */ + export interface WindowsListenOptions extends Options { + kind: "windows"; + maxInstances?: number; + pipeMode: PipeMode; + inbound?: boolean; + outbound?: boolean; + } + + /** @category Network */ + export interface UnixListenOptions extends Options { + kind: "unix"; + mode?: number; + create?: boolean; + } + + /** + * @category Network + * + * Connect to a named pipe. + */ + export function connect(options: Options): Pipe; + + /** + * Connect to a named pipe. + * + * @category Network + */ + export function listen(options: Options): Pipe; + + /** + * Connect to a named pipe. + * + * only works on Windows. + * + * @category Network + */ + export function listen(options: WindowsListenOptions): Pipe; + + /** + * Connect to a named pipe. + * + * only works on Unix-like systems. + * + * @category Network + */ + export function listen(options: UnixListenOptions): Pipe; + } + + /** @category Network */ + export const pipe: PipeNs; + export {}; // only export exports } diff --git a/ext/net/04_pipe.ts b/ext/net/04_pipe.ts new file mode 100644 index 00000000000000..7325bafba1e2d9 --- /dev/null +++ b/ext/net/04_pipe.ts @@ -0,0 +1,177 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +import { core, primordials } from "ext:core/mod.js"; + +const { + internalRidSymbol, +} = core; +import { op_pipe_connect, op_pipe_listen } from "ext:core/ops"; +const { + Error, + ObjectDefineProperty, + SymbolDispose, + SafeSet, + SetPrototypeAdd, + SetPrototypeDelete, + SetPrototypeForEach, +} = primordials; +import { + readableStreamForRidUnrefable, + readableStreamForRidUnrefableRef, + readableStreamForRidUnrefableUnref, + writableStreamForRid, +} from "ext:deno_web/06_streams.js"; + +enum PipeMode { + Message = "message", + Byte = "byte", +} + +type Kind = "unix" | "windows"; + +interface Options { + path: string; + kind: Kind; +} + +interface WindowsListenOptions extends Options { + kind: "windows"; + maxInstances?: number; + pipeMode: PipeMode; + inbound?: boolean; + outbound?: boolean; +} + +interface WindowsConnectOptions extends Options { + kind: "windows"; + read?: boolean; + write?: boolean; +} + +interface UnixListenOptions extends Options { + kind: "unix"; + mode?: number; + create?: boolean; +} + +async function write(rid, data) { + return await core.write(rid, data); +} + +class Pipe { + readonly #rid = 0; + #unref = false; + #pendingReadPromises = new SafeSet(); + + #readable; + #writable; + + constructor(rid: number) { + ObjectDefineProperty(this, internalRidSymbol, { + __proto__: null, + enumerable: false, + value: rid, + }); + + this.#rid = rid; + } + + write(buffer): Promise { + return write(this.#rid, buffer); + } + + async read(buffer): Promise { + if (buffer.length === 0) { + return 0; + } + const promise = core.read(this.#rid, buffer); + if (this.#unref) core.unrefOpPromise(promise); + SetPrototypeAdd(this.#pendingReadPromises, promise); + let nread; + try { + nread = await promise; + } catch (e) { + throw e; + } finally { + SetPrototypeDelete(this.#pendingReadPromises, promise); + } + return nread === 0 ? null : nread; + } + + close() { + core.close(this.#rid); + } + get readable(): ReadableStream { + if (this.#readable === undefined) { + this.#readable = readableStreamForRidUnrefable(this.#rid); + if (this.#unref) { + readableStreamForRidUnrefableUnref(this.#readable); + } + } + return this.#readable; + } + + get writable(): WritableStream { + if (this.#writable === undefined) { + this.#writable = writableStreamForRid(this.#rid); + } + return this.#writable; + } + + ref() { + this.#unref = false; + if (this.#readable) { + readableStreamForRidUnrefableRef(this.#readable); + } + + SetPrototypeForEach( + this.#pendingReadPromises, + (promise) => core.refOpPromise(promise), + ); + } + + unref() { + this.#unref = true; + if (this.#readable) { + readableStreamForRidUnrefableUnref(this.#readable); + } + SetPrototypeForEach( + this.#pendingReadPromises, + (promise) => core.unrefOpPromise(promise), + ); + } + + [SymbolDispose]() { + core.tryClose(this.#rid); + } +} + +async function connect(opts: Options | WindowsConnectOptions) { + let rid: number; + switch (opts.kind) { + case "unix": + rid = op_pipe_connect(opts.path, "Deno.pipe.connect"); + return new Pipe(rid); + case "windows": + rid = await op_pipe_connect(opts, "Deno.pipe.connect"); + return new Pipe(rid); + default: + throw new Error(`Unsupported kind: ${opts.kind}`); + } +} + +function listen(opts: WindowsListenOptions | UnixListenOptions) { + let rid: number; + switch (opts.kind) { + case "unix": + rid = op_pipe_listen(opts, "Deno.pipe.listen"); + return new Pipe(rid); + case "windows": + rid = op_pipe_listen(opts, "Deno.pipe.listen"); + return new Pipe(rid); + default: + throw new Error(`Unsupported kind: ${opts.kind}`); + } +} + +export { connect, listen }; diff --git a/ext/net/Cargo.toml b/ext/net/Cargo.toml index 4f015a7140eaea..22b6b4537f8c11 100644 --- a/ext/net/Cargo.toml +++ b/ext/net/Cargo.toml @@ -21,6 +21,7 @@ deno_permissions.workspace = true deno_tls.workspace = true hickory-proto.workspace = true hickory-resolver.workspace = true +nix = { workspace = true, features = ["fs"] } pin-project.workspace = true quinn = { workspace = true, features = ["runtime-tokio", "rustls", "ring"] } rustls-tokio-stream.workspace = true diff --git a/ext/net/lib.rs b/ext/net/lib.rs index 06ac54d93bfe68..153b85f2aea51e 100644 --- a/ext/net/lib.rs +++ b/ext/net/lib.rs @@ -2,6 +2,12 @@ pub mod io; pub mod ops; +#[cfg(unix)] +#[path = "ops_unix_pipe.rs"] +mod ops_pipe; +#[cfg(windows)] +#[path = "ops_win_pipe.rs"] +mod ops_pipe; pub mod ops_tls; #[cfg(unix)] pub mod ops_unix; @@ -9,6 +15,10 @@ mod quic; pub mod raw; pub mod resolve_addr; pub mod tcp; +#[cfg(unix)] +mod unix_pipe; +#[cfg(windows)] +mod win_pipe; use std::borrow::Cow; use std::path::Path; @@ -220,8 +230,11 @@ deno_core::extension!(deno_net, quic::op_quic_send_stream_set_priority, quic::webtransport::op_webtransport_accept, quic::webtransport::op_webtransport_connect, + + ops_pipe::op_pipe_listen

, + ops_pipe::op_pipe_connect

, ], - esm = [ "01_net.js", "02_tls.js" ], + esm = [ "01_net.js", "02_tls.js", "04_pipe.ts" ], lazy_loaded_esm = [ "03_quic.js" ], options = { root_cert_store_provider: Option>, diff --git a/ext/net/ops_unix_pipe.rs b/ext/net/ops_unix_pipe.rs new file mode 100644 index 00000000000000..35b98cda5b62a0 --- /dev/null +++ b/ext/net/ops_unix_pipe.rs @@ -0,0 +1,117 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +use std::borrow::Cow; +use std::io; +use std::path::Path; + +use deno_core::op2; +use deno_core::OpState; +use deno_core::ResourceId; +use serde::Deserialize; + +use crate::ops::NetError; +use crate::unix_pipe::NamedPipe; +use crate::NetPermissions; + +fn is_false() -> bool { + false +} + +#[derive(Deserialize, Debug)] +struct ListenArgs { + path: String, + mode: Option, + #[serde(default = "is_false")] + create: bool, +} + +pub fn create_pipe<'a, NP>( + permissions: &'a mut NP, + path: &str, + mode: nix::sys::stat::Mode, + api_name: &str, +) -> Result, NetError> +where + NP: NetPermissions + 'static, +{ + let path = { + let path = permissions + .check_read(path, api_name) + .map_err(NetError::Permission)?; + + permissions + .check_write_path(Cow::Owned(path), api_name) + .map_err(NetError::Permission)? + }; + + nix::unistd::mkfifo(path.as_ref(), mode).map_err(io::Error::from)?; + + Ok(path) +} + +#[op2(stack_trace)] +#[serde] +pub fn op_pipe_listen( + state: &mut OpState, + #[serde] args: ListenArgs, + #[string] api_name: String, +) -> Result +where + NP: NetPermissions + 'static, +{ + let permissions = state.borrow_mut::(); + + let api_call_expr = format!("{}()", api_name); + let path = if args.create { + create_pipe( + permissions, + &args.path, + args + .mode + .map(|mode| nix::sys::stat::Mode::from_bits(mode as _)) + .unwrap_or(Some(nix::sys::stat::Mode::S_IRWXU)) + .ok_or(NetError::Io(io::ErrorKind::InvalidInput.into()))?, + &api_call_expr, + )? + } else { + let path = permissions + .check_read(&args.path, &api_call_expr) + .map_err(NetError::Permission)?; + + permissions + .check_write_path(Cow::Owned(path), &api_call_expr) + .map_err(NetError::Permission)? + }; + + let pipe = NamedPipe::new_receiver(path)?; + let rid = state.resource_table.add(pipe); + Ok(rid) +} + +#[op2(fast, stack_trace)] +#[smi] +pub fn op_pipe_connect( + state: &mut OpState, + #[string] path: String, + #[string] api_name: &str, +) -> Result +where + NP: NetPermissions + 'static, +{ + let permissions = state.borrow_mut::(); + + let api_call_expr = format!("{}()", api_name); + let path = { + let path = permissions + .check_read(&path, &api_call_expr) + .map_err(NetError::Permission)?; + + permissions + .check_write_path(Cow::Owned(path), &api_call_expr) + .map_err(NetError::Permission)? + }; + + let pipe = NamedPipe::new_sender(path)?; + let rid = state.resource_table.add(pipe); + Ok(rid) +} diff --git a/ext/net/ops_win_pipe.rs b/ext/net/ops_win_pipe.rs new file mode 100644 index 00000000000000..9b65f820c02549 --- /dev/null +++ b/ext/net/ops_win_pipe.rs @@ -0,0 +1,140 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +use std::borrow::Cow; +use std::cell::RefCell; +use std::path::PathBuf; +use std::rc::Rc; + +use deno_core::op2; +use deno_core::JsBuffer; +use deno_core::OpState; +use deno_core::Resource; +use deno_core::ResourceId; +use serde::Deserialize; +use tokio::net::windows::named_pipe; + +use crate::ops::NetError; +use crate::win_pipe::NamedPipe; +use crate::NetPermissions; + +fn is_true() -> bool { + true +} + +#[derive(Deserialize, Debug)] +pub enum PipeMode { + #[serde(rename = "message")] + Message, + #[serde(rename = "byte")] + Byte, +} + +impl From for named_pipe::PipeMode { + fn from(value: PipeMode) -> Self { + match value { + PipeMode::Message => named_pipe::PipeMode::Message, + PipeMode::Byte => named_pipe::PipeMode::Byte, + } + } +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +struct ListenArgs { + path: String, + max_instances: Option, + pipe_mode: PipeMode, + #[serde(default = "is_true")] + inbound: bool, + #[serde(default = "is_true")] + outbound: bool, +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +struct ConnectArgs { + path: String, + #[serde(default = "is_true")] + read: bool, + #[serde(default = "is_true")] + write: bool, +} + +#[op2(stack_trace)] +#[serde] +pub fn op_pipe_listen( + state: &mut OpState, + #[serde] args: ListenArgs, + #[string] api_name: String, +) -> Result +where + NP: NetPermissions + 'static, +{ + let permissions = state.borrow_mut::(); + + let api_call_expr = format!("{}()", api_name); + let path = { + let path = if args.inbound { + permissions + .check_read(&*args.path, &api_call_expr) + .map_err(NetError::Permission)? + } else { + PathBuf::from(args.path) + }; + + permissions + .check_write_path(Cow::Owned(path), &api_call_expr) + .map_err(NetError::Permission)? + }; + + let mut opts = named_pipe::ServerOptions::new(); + opts + .pipe_mode(args.pipe_mode.into()) + .access_inbound(args.inbound) + .access_outbound(args.outbound); + if args.max_instances.is_some() { + opts.max_instances(args.max_instances.unwrap()); + } + let pipe = NamedPipe::new_server(path.as_ref(), &opts)?; + let rid = state.resource_table.add(pipe); + Ok(rid) +} + +#[op2(stack_trace)] +#[smi] +pub async fn op_pipe_connect( + state: &mut OpState, + #[serde] args: ConnectArgs, + #[string] api_name: &str, +) -> Result +where + NP: NetPermissions + 'static, +{ + let permissions = state.borrow_mut::(); + + let api_call_expr = format!("{}()", api_name); + let path = { + let path = if args.read { + permissions + .check_read(&args.path, &api_call_expr) + .map_err(NetError::Permission)? + } else { + PathBuf::from(args.path) + }; + + if args.write { + permissions + .check_write_path(Cow::Owned(path), &api_call_expr) + .map_err(NetError::Permission)? + } else { + Cow::Owned(path) + } + }; + + let mut opts = named_pipe::ClientOptions::new(); + opts.read(args.read).write(args.write); + let pipe = NamedPipe::new_client(path.as_ref(), &opts)?; + pipe.connect().await?; + let rid = state.resource_table.add(pipe); + Ok(rid) +} diff --git a/ext/net/unix_pipe.rs b/ext/net/unix_pipe.rs new file mode 100644 index 00000000000000..9866ae6c06876c --- /dev/null +++ b/ext/net/unix_pipe.rs @@ -0,0 +1,78 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +use std::borrow::Cow; +use std::io; +use std::path::Path; +use std::rc::Rc; + +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::RcRef; +use deno_core::Resource; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; +use tokio::net::unix::pipe; + +pub struct NamedPipe { + inner: AsyncRefCell, + cancel: CancelHandle, +} + +enum Inner { + Receiver(pipe::Receiver), + Sender(pipe::Sender), +} + +impl NamedPipe { + pub fn new_receiver(path: impl AsRef) -> io::Result { + let receiver = pipe::OpenOptions::new().open_receiver(path)?; + Ok(NamedPipe { + inner: AsyncRefCell::new(Inner::Receiver(receiver)), + cancel: Default::default(), + }) + } + + pub fn new_sender(path: impl AsRef) -> io::Result { + let sender = pipe::OpenOptions::new().open_sender(path)?; + Ok(NamedPipe { + inner: AsyncRefCell::new(Inner::Sender(sender)), + cancel: Default::default(), + }) + } + + pub async fn write(self: Rc, buf: &[u8]) -> io::Result { + let mut inner = RcRef::map(&self, |s| &s.inner).borrow_mut().await; + let cancel = RcRef::map(&self, |s| &s.cancel); + match &mut *inner { + Inner::Receiver(_) => Err(io::ErrorKind::Unsupported.into()), + Inner::Sender(sender) => sender.write(buf).try_or_cancel(cancel).await, + } + } + + pub async fn read(self: Rc, buf: &mut [u8]) -> io::Result { + let mut inner = RcRef::map(&self, |s| &s.inner).borrow_mut().await; + let cancel = RcRef::map(&self, |s| &s.cancel); + match &mut *inner { + Inner::Receiver(receiver) => { + receiver.read(buf).try_or_cancel(cancel).await + } + Inner::Sender(_) => Err(io::ErrorKind::Unsupported.into()), + } + } +} + +impl Resource for NamedPipe { + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + + fn name(&self) -> Cow { + Cow::Borrowed("namedPipe") + } + + fn close(self: Rc) { + let inner = RcRef::map(self, |s| &s.cancel); + inner.cancel(); + } +} diff --git a/ext/net/win_pipe.rs b/ext/net/win_pipe.rs new file mode 100644 index 00000000000000..3fbd07ad1be5e0 --- /dev/null +++ b/ext/net/win_pipe.rs @@ -0,0 +1,91 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +use std::borrow::Cow; +use std::ffi::OsStr; +use std::io; +use std::rc::Rc; + +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::RcRef; +use deno_core::Resource; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; +use tokio::net::windows::named_pipe; + +pub struct NamedPipe { + inner: AsyncRefCell, + cancel: CancelHandle, +} + +enum Inner { + Server(named_pipe::NamedPipeServer), + Client(named_pipe::NamedPipeClient), +} + +impl NamedPipe { + pub async fn new_server( + addr: impl AsRef, + options: &named_pipe::ServerOptions, + ) -> io::Result { + let server = options.create(addr)?; + Ok(NamedPipe { + inner: AsyncRefCell::new(Inner::Server(server)), + cancel: Default::default(), + }) + } + + pub fn new_client( + addr: impl AsRef, + options: &named_pipe::ClientOptions, + ) -> io::Result { + let client = options.open(addr)?; + Ok(NamedPipe { + inner: AsyncRefCell::new(Inner::Client(client)), + cancel: Default::default(), + }) + } + + pub async fn connect(&self) -> io::Result<()> { + match &self.inner { + Inner::Server(ref inner) => { + inner.connect().try_or_cancel(&self.cancel).await + } + Inner::Client(ref inner) => Ok(()), + } + } + + pub async fn write(self: Rc, buf: &[u8]) -> io::Result { + let mut inner = RcRef::map(&self, |s| &s.inner).borrow_mut().await; + let cancel = RcRef::map(&self, |s| &s.cancel); + match &mut *inner { + Inner::Server(server) => server.write(buf).try_or_cancel(cancel).await, + Inner::Client(client) => client.write(buf).try_or_cancel(cancel).await, + } + } + + pub async fn read(self: Rc, buf: &mut [u8]) -> io::Result { + let mut inner = RcRef::map(&self, |s| &s.inner).borrow_mut().await; + let cancel = RcRef::map(&self, |s| &s.cancel); + match &mut *inner { + Inner::Server(server) => server.read(buf).try_or_cancel(cancel).await, + Inner::Client(client) => client.read(buf).try_or_cancel(cancel).await, + } + } +} + +impl Resource for NamedPipe { + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + + fn name(&self) -> Cow { + Cow::Borrowed("namedPipe") + } + + fn close(self: Rc) { + let inner = RcRef::map(self, |s| &s.cancel); + inner.cancel(); + } +} diff --git a/ext/node/polyfills/internal_binding/pipe_wrap.ts b/ext/node/polyfills/internal_binding/pipe_wrap.ts index 3917be36eeab6d..ad55571975f48a 100644 --- a/ext/node/polyfills/internal_binding/pipe_wrap.ts +++ b/ext/node/polyfills/internal_binding/pipe_wrap.ts @@ -59,7 +59,7 @@ export class Pipe extends ConnectionWrap { override reading = false; ipc: boolean; - // REF: https://github.com/nodejs/node/blob/master/deps/uv/src/win/pipe.c#L48 + // REF: https://github.com/nodejs/node/blob/main/deps/uv/src/win/pipe.c#L48 #pendingInstances = 4; #address?: string; @@ -71,7 +71,10 @@ export class Pipe extends ConnectionWrap { #closed = false; #acceptBackoffDelay?: number; - constructor(type: number, conn?: Deno.UnixConn | StreamBase) { + constructor( + type: number, + conn?: Deno.UnixConn | Deno.PipeNs.Pipe | StreamBase, + ) { let provider: providerType; let ipc: boolean; @@ -137,8 +140,30 @@ export class Pipe extends ConnectionWrap { */ connect(req: PipeConnectWrap, address: string) { if (isWindows) { - // REF: https://github.com/denoland/deno/issues/10244 - notImplemented("Pipe.prototype.connect - Windows"); + Deno.pipe.connect({ + path: address, + kind: "windows", + }).then((pipe: Deno.PipeNs.Pipe) => { + this[kStreamBaseField] = pipe; + this.#address = req.address; + + try { + this.afterConnect(req, 0); + } catch { + // swallow callback errors. + } + }, (e) => { + const code = codeMap.get(e.code ?? "UNKNOWN") ?? + codeMap.get("UNKNOWN")!; + + try { + this.afterConnect(req, code); + } catch { + // swallow callback errors. + } + }); + + return 0; } const connectOptions: Deno.UnixConnectOptions = { @@ -180,15 +205,30 @@ export class Pipe extends ConnectionWrap { * @return An error status code. */ listen(backlog: number): number { - if (isWindows) { - // REF: https://github.com/denoland/deno/issues/10244 - notImplemented("Pipe.prototype.listen - Windows"); - } - this.#backlog = isWindows ? this.#pendingInstances : ceilPowOf2(backlog + 1); + if (isWindows) { + let listener; + try { + listener = Deno.pipe.listen({ + path: this.#address!, + kind: "windows", + pipeMode: "byte", + }); + } catch (e) { + if (e instanceof Deno.errors.NotCapable) { + throw e; + } + return codeMap.get(e.code ?? "UNKNOWN") ?? codeMap.get("UNKNOWN")!; + } + + this.#listener = listener; + + return 0; + } + const listenOptions = { path: this.#address!, transport: "unix" as const, @@ -299,6 +339,10 @@ export class Pipe extends ConnectionWrap { return; } + if (isWindows) { + return; + } + if (this.#connections > this.#backlog!) { this.#acceptBackoff(); diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index ba19e6ce19ebeb..763cffcadadc82 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -14,6 +14,7 @@ import * as console from "ext:deno_console/01_console.js"; import * as ffi from "ext:deno_ffi/00_ffi.js"; import * as net from "ext:deno_net/01_net.js"; import * as tls from "ext:deno_net/02_tls.js"; +import * as pipe from "ext:deno_net/04_pipe.ts"; import * as serve from "ext:deno_http/00_serve.ts"; import * as http from "ext:deno_http/01_http.js"; import * as websocket from "ext:deno_http/02_websocket.ts"; @@ -126,6 +127,7 @@ const denoNs = { stderr: io.stderr, connect: net.connect, listen: net.listen, + pipe: pipe, loadavg: os.loadavg, connectTls: tls.connectTls, listenTls: tls.listenTls,