Skip to content

If I want a transform stream to be able to be used as just a writer as well as a true transform, how do I stop pushing data to a possibly non-existant downstream? #4340

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
1mike12 opened this issue Feb 7, 2024 · 3 comments
Labels

Comments

@1mike12
Copy link

1mike12 commented Feb 7, 2024

Details

So I have a custom transform stream that basically just lets me accumulate data from upstream, and run a async task when it's hit a predefined batch limit. The use case is to stream files and batch insert into a database.

What I understand is that we can use a transform stream either as a true transformer, or just as a writer AKA the end of the stream pipeline.

However, if we want a stream that can behave correctly in either mode, does this just work automatically? If I push to the read side buffer but there is nothing attached on the read side, wouldn't the read side buffer of my transform grow forever? I don't get it. I can't find any conversation about this anywhere online.

Node.js version

Not applicable.

Example code

import { Transform, TransformCallback } from "stream"

export class BatchStream extends Transform {
  private readonly batchSize: number
  private readonly asyncProcessFunction: (batch: any[]) => Promise<any>
  private batch: any[]

  constructor(batchSize: number, asyncProcessFunction: (batch: any[]) => Promise<any>) {
    super({ objectMode: true })
    this.batchSize = batchSize
    this.asyncProcessFunction = asyncProcessFunction
    this.batch = []
  }

  _transform(chunk: any, encoding: string, callback: TransformCallback): void {
    this.batch.push(chunk)

    if (this.batch.length >= this.batchSize) {
      void this.processBatch(callback)
    } else {
      callback()
    }
  }

  _flush(callback: TransformCallback): void {
    if (this.batch.length > 0) {
      void this.processBatch(callback)
    } else {
      callback()
    }
  }

  private async processBatch(callback: TransformCallback): Promise<void> {
    try {
      await this.asyncProcessFunction(this.batch)
      this.batch = []
      callback()
    } catch (err) {
      callback(err)
    }
  }
}

Operating system

n/a

Scope

code

Module and version

streams

@avivkeller
Copy link
Member

Hi! Are you still experiencing this concern?

Copy link

github-actions bot commented Jan 7, 2025

It seems there has been no activity on this issue for a while, and it is being closed in 30 days. If you believe this issue should remain open, please leave a comment.
If you need further assistance or have questions, you can also search for similar issues on Stack Overflow.
Make sure to look at the README file for the most updated links.

@github-actions github-actions bot added the stale label Jan 7, 2025
@1mike12
Copy link
Author

1mike12 commented Jan 11, 2025

dont close, i haven't heard a decent response

@github-actions github-actions bot removed the stale label Jan 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants