Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
"@aws-sdk/client-sts": "3.864.0",
"@aws-sdk/credential-providers": "3.864.0",
"@aws-sdk/s3-request-presigner": "3.864.0",
"@aws-sdk/lib-storage": "3.864.0",
"@azure/identity": "4.10.2",
"@azure/monitor-query": "1.3.2",
"@azure/storage-blob": "12.27.0",
Expand Down Expand Up @@ -126,7 +127,6 @@
},
"devDependencies": {
"@aws-sdk/client-iam": "3.864.0",
"@aws-sdk/lib-storage": "3.864.0",
"@stylistic/eslint-plugin-js": "1.8.1",
"@types/jest": "30.0.0",
"@types/lodash": "4.17.20",
Expand Down
37 changes: 25 additions & 12 deletions src/manage_nsfs/manage_nsfs_logging.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
/* Copyright (C) 2024 NooBaa */
'use strict';

const AWS = require('aws-sdk');
const { S3 } = require('@aws-sdk/client-s3');
const noobaa_s3_client = require('../sdk/noobaa_s3_client/noobaa_s3_client');
const config = require('../../config');
const http_utils = require('../util/http_utils');
const { account_id_cache } = require('../sdk/accountspace_fs');
const { export_logs_to_target } = require('../util/bucket_logs_utils');
const ManageCLIError = require('../manage_nsfs/manage_nsfs_cli_errors').ManageCLIError;
Expand All @@ -16,23 +16,36 @@ let config_fs;
*/
async function export_bucket_logging(shared_config_fs) {
config_fs = shared_config_fs;
const endpoint = `https://127.0.0.1:${config.ENDPOINT_SSL_PORT}`;
const noobaa_con = new AWS.S3({
endpoint,
s3ForcePathStyle: true,
sslEnabled: false,
httpOptions: {
agent: http_utils.get_unsecured_agent(endpoint)
}
});
const success = await export_logs_to_target(config_fs.fs_context, noobaa_con, get_bucket_owner_keys);
const success = await export_logs_to_target(config_fs.fs_context, noobaa_con_func, get_bucket_owner_keys);
if (success) {
write_stdout_response(ManageCLIResponse.LoggingExported);
} else {
throw_cli_error(ManageCLIError.LoggingExportFailed);
}
}

/**
* A CB for bucket_logs_utils to get a v3 S3 connection.
* @param {Object} credentials for the target bucket
* @returns {S3} An S3 connection
*/

function noobaa_con_func(credentials) {
const endpoint = `https://127.0.0.1:${config.ENDPOINT_SSL_PORT}`;

return new S3({
endpoint,
forcePathStyle: true,
tls: false,
region: config.DEFAULT_REGION,
requestHandler: noobaa_s3_client.get_requestHandler_with_suitable_agent(endpoint),
credentials: {
accessKeyId: credentials[0].access_key,
secretAccessKey: credentials[0].secret_key
}
});
}
Comment on lines +33 to +47
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Fix TLS mismatch and harden credentials handling

endpoint is HTTPS while tls: false forces HTTP; this mismatch can cause connection issues. Also, add a guard for missing keys and pass sessionToken when present. A small retry bound is prudent.

Apply:

 function noobaa_con_func(credentials) {
     const endpoint = `https://127.0.0.1:${config.ENDPOINT_SSL_PORT}`;
 
-    return new S3({
-            endpoint,
-            forcePathStyle: true,
-            tls: false,
-            region: config.DEFAULT_REGION,
-            requestHandler: noobaa_s3_client.get_requestHandler_with_suitable_agent(endpoint),
-            credentials: {
-                accessKeyId: credentials[0].access_key,
-                secretAccessKey: credentials[0].secret_key
-            }
-    });
+    const k = credentials?.[0];
+    if (!k?.access_key || !k?.secret_key) {
+        throw new Error('Missing bucket owner access keys');
+    }
+    return new S3({
+        endpoint,
+        forcePathStyle: true,
+        region: config.DEFAULT_REGION,
+        requestHandler: noobaa_s3_client.get_requestHandler_with_suitable_agent(endpoint),
+        credentials: {
+            accessKeyId: k.access_key,
+            secretAccessKey: k.secret_key,
+            sessionToken: k.session_token
+        },
+        maxAttempts: 3,
+    });
 }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In src/manage_nsfs/manage_nsfs_logging.js around lines 33 to 47, the S3 client
is created with an HTTPS endpoint but tls:false and unguarded credential access;
change tls to true to match the HTTPS endpoint, validate credentials[0] exists
and has access_key and secret_key before using them (throw or return an error if
missing), include credentials[0].session_token as sessionToken when present, and
wrap the S3 creation/initial request in a small retry loop with a tight
maxRetries (e.g., 2-3) and backoff to handle transient failures.


/**
* return bucket owner's access and secret key
* @param {string} log_bucket_name
Expand Down
4 changes: 2 additions & 2 deletions src/server/bg_services/bucket_logs_upload.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class BucketLogUploader {
}
if (config.BUCKET_LOG_TYPE === 'PERSISTENT') {
const fs_context = get_process_fs_context();
const success = await export_logs_to_target(fs_context, this.noobaa_connection, this.get_bucket_owner_keys);
const success = await export_logs_to_target(fs_context, () => this.noobaa_connection, this.get_bucket_owner_keys);
if (success) {
dbg.log0('Logs were uploaded succesfully to their target buckets');
} else {
Expand Down Expand Up @@ -148,7 +148,7 @@ class BucketLogUploader {
};

try {
await noobaa_con.putObject(params).promise();
await noobaa_con.putObject(params);
} catch (err) {
dbg.error('Failed to upload bucket log object: ', log_object.log_object_name, ' to bucket: ', log_object.log_bucket_name, ' :', err);
}
Expand Down
49 changes: 30 additions & 19 deletions src/util/bucket_logs_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ const path = require('path');
const { PersistentLogger } = require('../util/persistent_logger');
const { format_aws_date } = require('../util/time_utils');
const nsfs_schema_utils = require('../manage_nsfs/nsfs_schema_utils');
const semaphore = require('../util/semaphore');
const P = require('../util/promise');
const nb_native = require('../util/nb_native');
const AWS = require('aws-sdk');
const { Upload } = require("@aws-sdk/lib-storage");
const { once } = require('events');

const sem = new semaphore.Semaphore(config.BUCKET_LOG_CONCURRENCY);
//const sem = new semaphore.Semaphore(config.BUCKET_LOG_CONCURRENCY);

// delimiter between bucket name and log object name.
// Assuming noobaa bucket name follows the s3 bucket
Expand All @@ -27,16 +27,16 @@ const BUCKET_NAME_DEL = "_";
* This function will process the persistent log of bucket logging
* and will upload the log files in using provided noobaa connection
* @param {nb.NativeFSContext} fs_context
* @param {AWS.S3} s3_connection
* @param {function(Object): import("@aws-sdk/client-s3").S3} s3_conn_func
* @param {function} bucket_to_owner_keys_func
*/
async function export_logs_to_target(fs_context, s3_connection, bucket_to_owner_keys_func) {
async function export_logs_to_target(fs_context, s3_conn_func, bucket_to_owner_keys_func) {
const entries = await nb_native().fs.readdir(fs_context, config.PERSISTENT_BUCKET_LOG_DIR);
const results = await P.map_with_concurrency(5, entries, async entry => {
if (!entry.name.endsWith('.log')) return;
const log = new PersistentLogger(config.PERSISTENT_BUCKET_LOG_DIR, path.parse(entry.name).name, { locking: 'EXCLUSIVE' });
try {
return log.process(async file => _upload_to_targets(s3_connection, file, bucket_to_owner_keys_func));
return log.process(async file => _upload_to_targets(file, s3_conn_func, bucket_to_owner_keys_func));
} catch (err) {
dbg.error('processing log file failed', log.file);
throw err;
Expand All @@ -51,12 +51,12 @@ async function export_logs_to_target(fs_context, s3_connection, bucket_to_owner_
* This function gets a persistent log file, will go over it's entries one by one,
* and will upload the entry to the target_bucket using the provided s3 connection
* in order to know which user to use to upload to each bucket we will need to provide bucket_to_owner_keys_func
* @param {AWS.S3} s3_connection
* @param {function(Object): import("@aws-sdk/client-s3").S3} s3_conn_func
* @param {import('../util/persistent_logger').LogFile} log_file
* @param {function} bucket_to_owner_keys_func
* @returns {Promise<Boolean>}
*/
async function _upload_to_targets(s3_connection, log_file, bucket_to_owner_keys_func) {
async function _upload_to_targets(log_file, s3_conn_func, bucket_to_owner_keys_func) {
const bucket_streams = {};
const promises = [];
try {
Expand All @@ -67,33 +67,44 @@ async function _upload_to_targets(s3_connection, log_file, bucket_to_owner_keys_
const target_bucket = log_entry.log_bucket;
const log_prefix = log_entry.log_prefix;
const source_bucket = log_entry.source_bucket;
if (!bucket_streams[source_bucket + BUCKET_NAME_DEL + target_bucket]) {
let upload_stream = bucket_streams[source_bucket + BUCKET_NAME_DEL + target_bucket];
if (!upload_stream) {
/* new stream is needed for each target bucket, but also for each source bucket
- as mulitple buckets can't be written to the same object */
const date = new Date();
const upload_stream = new stream.PassThrough();
upload_stream = new stream.PassThrough();
let access_keys;
try {
access_keys = await bucket_to_owner_keys_func(target_bucket);
} catch (err) {
dbg.warn('Error when trying to resolve bucket keys', err);
if (err.rpc_code === 'NO_SUCH_BUCKET') return; // If the log_bucket doesn't exist any more - nowhere to upload - just skip
}
s3_connection.config.credentials = new AWS.Credentials(access_keys[0].access_key, access_keys[0].secret_key);
//target bucket can be in either cloud or an NC noobaa bucket
//use the s3_conn_func CB to get suitable connection
const s3_connection = s3_conn_func(access_keys);
const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex');
promises.push(sem.surround(() => P.retry({
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing retries, will it impact the bucket logging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how well the retry will behave with the v3 Upload.
WCS we will re-process the log file in the next run.

attempts: 3,
delay_ms: 1000,
func: () => s3_connection.upload({

const upload = new Upload({
client: s3_connection,
params: {
Bucket: target_bucket,
Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`,
Body: upload_stream,
}).promise()
})));
Body: upload_stream
},
Comment on lines 86 to +94
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Possible S3 key collision across different source buckets. Include source bucket in the key.

Two different source buckets targeting the same bucket within the same millisecond will compute the same sha and object key. Include source_bucket in the hash and/or the key string.

Apply:

-                const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex');
+                const sha = crypto.createHash('sha512')
+                    .update(`${target_bucket}:${source_bucket}:${date.getTime()}`)
+                    .digest('hex');

@@
-                        Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`,
+                        Key: `${log_prefix}${format_aws_date(date)}-${source_bucket}-${sha.slice(0, 16).toUpperCase()}`,
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex');
promises.push(sem.surround(() => P.retry({
attempts: 3,
delay_ms: 1000,
func: () => s3_connection.upload({
const upload = new Upload({
client: s3_connection,
params: {
Bucket: target_bucket,
Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`,
Body: upload_stream,
}).promise()
})));
Body: upload_stream
},
const sha = crypto.createHash('sha512')
.update(`${target_bucket}:${source_bucket}:${date.getTime()}`)
.digest('hex');
const upload = new Upload({
client: s3_connection,
params: {
Bucket: target_bucket,
Key: `${log_prefix}${format_aws_date(date)}-${source_bucket}-${sha.slice(0, 16).toUpperCase()}`,
Body: upload_stream
},

queueSize: 1,
});
Comment on lines +88 to +96
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid deadlock when waiting for 'drain' if Upload fails.

If the Upload stops consuming (e.g., network error), drain may never fire and the writer hangs. Tie the stream to the upload promise and race drain with an error event.

                 const upload = new Upload({
                     client: s3_connection,
                     params: {
                         Bucket: target_bucket,
                         Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`,
                         Body: upload_stream
                     },
-                    queueSize: 1,
+                    queueSize: 1,
                 });
 
                 bucket_streams[source_bucket + BUCKET_NAME_DEL + target_bucket] = upload_stream;
-                promises.push(upload.done());
+                const upload_promise = upload.done();
+                // If upload fails, unblock writers waiting on 'drain'
+                upload_promise.catch(err => upload_stream.destroy(err));
+                promises.push(upload_promise);
@@
-            if (!can_write) {
-                dbg.warn(`waiting for target bucket: ${target_bucket} to drain`);
-                await once(upload_stream, 'drain');
-                dbg.warn(`target bucket: ${target_bucket} drained, resume writing`);
-            }
+            if (!can_write) {
+                dbg.warn(`waiting for target bucket: ${target_bucket} to drain`);
+                await Promise.race([
+                    once(upload_stream, 'drain'),
+                    // Fail fast if the upload errors and stops consuming
+                    once(upload_stream, 'error').then(([err]) => Promise.reject(err)),
+                ]);
+                dbg.warn(`target bucket: ${target_bucket} drained, resume writing`);
+            }

Also applies to: 99-99, 103-107

🤖 Prompt for AI Agents
In src/util/bucket_logs_utils.js around lines 88-96 (and also apply same fix at
lines ~99 and 103-107), the writable stream can hang waiting for 'drain' if the
S3 Upload stops consuming; modify the logic so the stream write waits on a
Promise.race between the stream 'drain' event and the Upload promise (or its
error), i.e., attach error handlers to the upload and convert the Upload to a
promise you can await, then when write returns false await
Promise.race([once(upload, 'error'), once(stream, 'drain'), uploadPromise]) so
that an upload error will reject and unblock the writer; ensure you remove
listeners after resolution to avoid leaks and propagate the upload error up.


bucket_streams[source_bucket + BUCKET_NAME_DEL + target_bucket] = upload_stream;
promises.push(upload.done());
}
dbg.log2(`uploading entry: ${entry} to target bucket: ${target_bucket}`);
bucket_streams[source_bucket + BUCKET_NAME_DEL + target_bucket].write(entry + '\n');
const can_write = upload_stream.write(entry + '\n');
if (!can_write) {
dbg.warn(`waiting for target bucket: ${target_bucket} to drain`);
await once(upload_stream, 'drain');
dbg.warn(`target bucket: ${target_bucket} drained, resume writing`);
}
});
Object.values(bucket_streams).forEach(st => st.end());
await Promise.all(promises);
Expand Down