Skip to content

Streams memory leak #8955

@devmetal

Description

@devmetal

Hi, i have a problems with stream. I try "streamify" some database selects. I created a pager. The pager job is simple, get some records then push in object mode to a stream. I will show the relevant parts of the debugger code, and the results. I hope this is not an issue just i miss something about streams.

My workflow is simple:
[ {/result objects from database/}, {}, {} ] -> (Push objects to PassThrough) {}, {} -> (Create array rows) [ 'values', 'from', 'objects' ] -> (CSV Writer) -> File or http response

//This will create arrays from specified keys from object
const recordStream = (fields) => new Transform({
  readableObjectMode: true,
  writableObjectMode: true,

  transform(chunk, enc, done) {
    if (!chunk) {
      return this.end();
    }

    const record = [];
    for (const field of fields) {
      record.push(chunk[field]);
    }

    this.push(record);
    done();
  }
});

// this is a csv writer instance from https://www.npmjs.com/package/csv-write-stream
const csvStream = (headers) => csvWrite({ headers });


// For better maintanable code, i use generators with promises

/**
 * Based on this
 * https://www.promisejs.org/generators/
 */
const gen = function generatorAsync(iterator) {
  return function generatorHandler() {
    const internal = iterator.apply(this, arguments);

    function handle(result) {
      if (result.done) return Promise.resolve(result.value);

      return Promise.resolve(result.value).then(
        res => handle(internal.next(res)),
        err => handle(internal.throw(err))
      );
    };

    try {
      return handle(internal.next());
    } catch (e) {
      return Promise.reject(e);
    }
  }
}

// The pager get stream instance and push evry record to
const pager = gen(function* (stream) {
  let skip = 0;
  const query = 'SELECT FROM E SKIP :skip LIMIT :limit';
  let results = yield db.query(query, { params: { skip, limit: 5000 } });

  while (results && !!results.length) {
    for (const row of results)  {
      stream.push(row);
    }

    skip += results.length;
    results = yield db.query(query, { params: { skip, limit: 5000 } });
  }

  return stream.end();
});


const records = recordStream(fields);
const csv = csvStream(fields);
const through = new PassThrough({ objectMode: true });

through
  .pipe(records)
  .pipe(csv)
  .pipe(fs.createWriteStream('./out.csv'));

through.on('end', () => console.log('end'));

pager(through);

// Debug
setInterval(function () {
  var mem = process.memoryUsage();
  var fmt = v => (v / (1024 * 1024)).toFixed(0) + 'MB';
  console.log('RSS = ' + fmt(mem.rss), 'Heap = ' + fmt(mem.heapUsed));
}, 1000);

There is a lot of record in database 30.000+. But its run really fast. The problem is the leak. I runned the server via pm2 and i see in monitor the memory not released.

When i run this code the output always similar this:

RSS = 80MB Heap = 29MB
RSS = 92MB Heap = 50MB
RSS = 102MB Heap = 55MB
RSS = 108MB Heap = 60MB
RSS = 101MB Heap = 28MB
RSS = 101MB Heap = 41MB
end
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
...

If you want to, i can put the whole code here. Just some requires and database connection.
I have to use orientdb with orientjs.

Thank you for help

Metadata

Metadata

Assignees

No one assigned

    Labels

    memoryIssues and PRs related to the memory management or memory footprint.questionIssues that look for answers.streamIssues and PRs related to the stream subsystem.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions