-
-
Notifications
You must be signed in to change notification settings - Fork 33.5k
Description
- Version: 4.6.0
- Platform: Linux adam-System-Product-Name 4.4.0-38-generic Fix for path.resolve containing relative path to another drive in windows #57~14.04.1-Ubuntu SMP Tue Sep 6 17:20:43 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
- Subsystem: stream
Hi, i have a problems with stream. I try "streamify" some database selects. I created a pager. The pager job is simple, get some records then push in object mode to a stream. I will show the relevant parts of the debugger code, and the results. I hope this is not an issue just i miss something about streams.
My workflow is simple:
[ {/result objects from database/}, {}, {} ] -> (Push objects to PassThrough) {}, {} -> (Create array rows) [ 'values', 'from', 'objects' ] -> (CSV Writer) -> File or http response
//This will create arrays from specified keys from object
const recordStream = (fields) => new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk, enc, done) {
if (!chunk) {
return this.end();
}
const record = [];
for (const field of fields) {
record.push(chunk[field]);
}
this.push(record);
done();
}
});
// this is a csv writer instance from https://www.npmjs.com/package/csv-write-stream
const csvStream = (headers) => csvWrite({ headers });
// For better maintanable code, i use generators with promises
/**
* Based on this
* https://www.promisejs.org/generators/
*/
const gen = function generatorAsync(iterator) {
return function generatorHandler() {
const internal = iterator.apply(this, arguments);
function handle(result) {
if (result.done) return Promise.resolve(result.value);
return Promise.resolve(result.value).then(
res => handle(internal.next(res)),
err => handle(internal.throw(err))
);
};
try {
return handle(internal.next());
} catch (e) {
return Promise.reject(e);
}
}
}
// The pager get stream instance and push evry record to
const pager = gen(function* (stream) {
let skip = 0;
const query = 'SELECT FROM E SKIP :skip LIMIT :limit';
let results = yield db.query(query, { params: { skip, limit: 5000 } });
while (results && !!results.length) {
for (const row of results) {
stream.push(row);
}
skip += results.length;
results = yield db.query(query, { params: { skip, limit: 5000 } });
}
return stream.end();
});
const records = recordStream(fields);
const csv = csvStream(fields);
const through = new PassThrough({ objectMode: true });
through
.pipe(records)
.pipe(csv)
.pipe(fs.createWriteStream('./out.csv'));
through.on('end', () => console.log('end'));
pager(through);
// Debug
setInterval(function () {
var mem = process.memoryUsage();
var fmt = v => (v / (1024 * 1024)).toFixed(0) + 'MB';
console.log('RSS = ' + fmt(mem.rss), 'Heap = ' + fmt(mem.heapUsed));
}, 1000);
There is a lot of record in database 30.000+. But its run really fast. The problem is the leak. I runned the server via pm2 and i see in monitor the memory not released.
When i run this code the output always similar this:
RSS = 80MB Heap = 29MB
RSS = 92MB Heap = 50MB
RSS = 102MB Heap = 55MB
RSS = 108MB Heap = 60MB
RSS = 101MB Heap = 28MB
RSS = 101MB Heap = 41MB
end
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
RSS = 101MB Heap = 41MB
...
If you want to, i can put the whole code here. Just some requires and database connection.
I have to use orientdb with orientjs.
Thank you for help