Skip to content

feat(net): named pipe support via Deno.pipe #29308

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

133 changes: 133 additions & 0 deletions cli/tsc/dts/lib.deno_net.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -953,5 +953,138 @@ declare namespace Deno {
conn: QuicConn,
): Promise<WebTransport & { url: string }>;

/**
* @category Network
* Named pipes for Deno
*/
export namespace PipeNs {
/** @category Network */
export interface Pipe extends Disposable {
/** Read the incoming data from the pipe into an array buffer (`p`).
*
* Resolves to either the number of bytes read during the operation or EOF
* (`null`) if there was nothing more to read.
*
* It is possible for a read to successfully return with `0` bytes. This
* does not indicate EOF.
*
* **It is not guaranteed that the full buffer will be read in a single
* call.**
*
* ```ts
* // If the text "hello world" is received by the client:
* const pipe = await Deno.pipe.listen({ path: "/tmp/my-pipe" });
* const buf = new Uint8Array(100);
* const numberOfBytesRead = await pipe.read(buf); // 11 bytes
* const text = new TextDecoder().decode(buf); // "hello world"
* ```
*
* @category I/O
*/
read(p: Uint8Array): Promise<number | null>;
/** Write the contents of the array buffer (`p`) to the pipe.
*
* Resolves to the number of bytes written.
*
* **It is not guaranteed that the full buffer will be written in a single
* call.**
*
* ```ts
* const pipe = await Deno.pipe.connect({ path: "/tmp/my-pipe", create: true });
* const encoder = new TextEncoder();
* const data = encoder.encode("Hello world");
* const bytesWritten = await pipe.write(data); // 11
* ```
*
* @category I/O
*/
write(p: Uint8Array): Promise<number>;
/** Closes the pipe, freeing the resource.
*
* ```ts
* const pipe = await Deno.pipe.connect({ path: "/tmp/my-pipe" });
*
* // ...
*
* pipe.close();
* ```
*/
close(): void;

/** Make the pipe block the event loop from finishing.
*
* Note: the pipe blocks the event loop from finishing by default.
* This method is only meaningful after `.unref()` is called.
*/
ref(): void;
/** Make the pipe not block the event loop from finishing. */
unref(): void;

readonly readable: ReadableStream<Uint8Array<ArrayBuffer>>;
readonly writable: WritableStream<Uint8Array<ArrayBufferLike>>;
}

/**
* Pipe Options
*
* @category Network
*/
export interface Options {
path: string;
kind: Kind;
}

/** @category Network */
export interface WindowsListenOptions extends Options {
kind: "windows";
maxInstances?: number;
pipeMode: PipeMode;
inbound?: boolean;
outbound?: boolean;
}

/** @category Network */
export interface UnixListenOptions extends Options {
kind: "unix";
mode?: number;
create?: boolean;
}

/**
* @category Network
*
* Connect to a named pipe.
*/
export function connect(options: Options): Pipe;

/**
* Connect to a named pipe.
*
* @category Network
*/
export function listen(options: Options): Pipe;

/**
* Connect to a named pipe.
*
* only works on Windows.
*
* @category Network
*/
export function listen(options: WindowsListenOptions): Pipe;

/**
* Connect to a named pipe.
*
* only works on Unix-like systems.
*
* @category Network
*/
export function listen(options: UnixListenOptions): Pipe;
}

/** @category Network */
export const pipe: PipeNs;

export {}; // only export exports
}
177 changes: 177 additions & 0 deletions ext/net/04_pipe.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright 2018-2025 the Deno authors. MIT license.

import { core, primordials } from "ext:core/mod.js";

const {
internalRidSymbol,
} = core;
import { op_pipe_connect, op_pipe_listen } from "ext:core/ops";
const {
Error,
ObjectDefineProperty,
SymbolDispose,
SafeSet,
SetPrototypeAdd,
SetPrototypeDelete,
SetPrototypeForEach,
} = primordials;
import {
readableStreamForRidUnrefable,
readableStreamForRidUnrefableRef,
readableStreamForRidUnrefableUnref,
writableStreamForRid,
} from "ext:deno_web/06_streams.js";

enum PipeMode {
Message = "message",
Byte = "byte",
}

type Kind = "unix" | "windows";

interface Options {
path: string;
kind: Kind;
}

interface WindowsListenOptions extends Options {
kind: "windows";
maxInstances?: number;
pipeMode: PipeMode;
inbound?: boolean;
outbound?: boolean;
}

interface WindowsConnectOptions extends Options {
kind: "windows";
read?: boolean;
write?: boolean;
}

interface UnixListenOptions extends Options {
kind: "unix";
mode?: number;
create?: boolean;
}

async function write(rid, data) {
return await core.write(rid, data);
}

class Pipe {
readonly #rid = 0;
#unref = false;
#pendingReadPromises = new SafeSet();

#readable;
#writable;

constructor(rid: number) {
ObjectDefineProperty(this, internalRidSymbol, {
__proto__: null,
enumerable: false,
value: rid,
});

this.#rid = rid;
}

write(buffer): Promise<number> {
return write(this.#rid, buffer);
}

async read(buffer): Promise<number> {
if (buffer.length === 0) {
return 0;
}
const promise = core.read(this.#rid, buffer);
if (this.#unref) core.unrefOpPromise(promise);
SetPrototypeAdd(this.#pendingReadPromises, promise);
let nread;
try {
nread = await promise;
} catch (e) {
throw e;
} finally {
SetPrototypeDelete(this.#pendingReadPromises, promise);
}
return nread === 0 ? null : nread;
}

close() {
core.close(this.#rid);
}
get readable(): ReadableStream<Uint8Array> {
if (this.#readable === undefined) {
this.#readable = readableStreamForRidUnrefable(this.#rid);
if (this.#unref) {
readableStreamForRidUnrefableUnref(this.#readable);
}
}
return this.#readable;
}

get writable(): WritableStream<Uint8Array> {
if (this.#writable === undefined) {
this.#writable = writableStreamForRid(this.#rid);
}
return this.#writable;
}

ref() {
this.#unref = false;
if (this.#readable) {
readableStreamForRidUnrefableRef(this.#readable);
}

SetPrototypeForEach(
this.#pendingReadPromises,
(promise) => core.refOpPromise(promise),
);
}

unref() {
this.#unref = true;
if (this.#readable) {
readableStreamForRidUnrefableUnref(this.#readable);
}
SetPrototypeForEach(
this.#pendingReadPromises,
(promise) => core.unrefOpPromise(promise),
);
}

[SymbolDispose]() {
core.tryClose(this.#rid);
}
}

async function connect(opts: Options | WindowsConnectOptions) {
let rid: number;
switch (opts.kind) {
case "unix":
rid = op_pipe_connect(opts.path, "Deno.pipe.connect");
return new Pipe(rid);
case "windows":
rid = await op_pipe_connect(opts, "Deno.pipe.connect");
return new Pipe(rid);
default:
throw new Error(`Unsupported kind: ${opts.kind}`);
}
}

function listen(opts: WindowsListenOptions | UnixListenOptions) {
let rid: number;
switch (opts.kind) {
case "unix":
rid = op_pipe_listen(opts, "Deno.pipe.listen");
return new Pipe(rid);
case "windows":
rid = op_pipe_listen(opts, "Deno.pipe.listen");
return new Pipe(rid);
default:
throw new Error(`Unsupported kind: ${opts.kind}`);
}
}

export { connect, listen };
1 change: 1 addition & 0 deletions ext/net/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ deno_permissions.workspace = true
deno_tls.workspace = true
hickory-proto.workspace = true
hickory-resolver.workspace = true
nix = { workspace = true, features = ["fs"] }
pin-project.workspace = true
quinn = { workspace = true, features = ["runtime-tokio", "rustls", "ring"] }
rustls-tokio-stream.workspace = true
Expand Down
15 changes: 14 additions & 1 deletion ext/net/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,23 @@

pub mod io;
pub mod ops;
#[cfg(unix)]
#[path = "ops_unix_pipe.rs"]
mod ops_pipe;
#[cfg(windows)]
#[path = "ops_win_pipe.rs"]
mod ops_pipe;
pub mod ops_tls;
#[cfg(unix)]
pub mod ops_unix;
mod quic;
pub mod raw;
pub mod resolve_addr;
pub mod tcp;
#[cfg(unix)]
mod unix_pipe;
#[cfg(windows)]
mod win_pipe;

use std::borrow::Cow;
use std::path::Path;
Expand Down Expand Up @@ -220,8 +230,11 @@ deno_core::extension!(deno_net,
quic::op_quic_send_stream_set_priority,
quic::webtransport::op_webtransport_accept,
quic::webtransport::op_webtransport_connect,

ops_pipe::op_pipe_listen<P>,
ops_pipe::op_pipe_connect<P>,
],
esm = [ "01_net.js", "02_tls.js" ],
esm = [ "01_net.js", "02_tls.js", "04_pipe.ts" ],
lazy_loaded_esm = [ "03_quic.js" ],
options = {
root_cert_store_provider: Option<Arc<dyn RootCertStoreProvider>>,
Expand Down
Loading
Loading