-
Notifications
You must be signed in to change notification settings - Fork 89
bucket logging - adapt s3 conn to aws sdk v3 (dfs 3887) #9210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -9,12 +9,12 @@ const path = require('path'); | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const { PersistentLogger } = require('../util/persistent_logger'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const { format_aws_date } = require('../util/time_utils'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const nsfs_schema_utils = require('../manage_nsfs/nsfs_schema_utils'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const semaphore = require('../util/semaphore'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const P = require('../util/promise'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const nb_native = require('../util/nb_native'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const AWS = require('aws-sdk'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const { Upload } = require("@aws-sdk/lib-storage"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const { once } = require('events'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const sem = new semaphore.Semaphore(config.BUCKET_LOG_CONCURRENCY); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| //const sem = new semaphore.Semaphore(config.BUCKET_LOG_CONCURRENCY); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // delimiter between bucket name and log object name. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Assuming noobaa bucket name follows the s3 bucket | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -27,16 +27,16 @@ const BUCKET_NAME_DEL = "_"; | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * This function will process the persistent log of bucket logging | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * and will upload the log files in using provided noobaa connection | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {nb.NativeFSContext} fs_context | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {AWS.S3} s3_connection | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {function(Object): import("@aws-sdk/client-s3").S3} s3_conn_func | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {function} bucket_to_owner_keys_func | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async function export_logs_to_target(fs_context, s3_connection, bucket_to_owner_keys_func) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async function export_logs_to_target(fs_context, s3_conn_func, bucket_to_owner_keys_func) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const entries = await nb_native().fs.readdir(fs_context, config.PERSISTENT_BUCKET_LOG_DIR); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const results = await P.map_with_concurrency(5, entries, async entry => { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!entry.name.endsWith('.log')) return; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const log = new PersistentLogger(config.PERSISTENT_BUCKET_LOG_DIR, path.parse(entry.name).name, { locking: 'EXCLUSIVE' }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return log.process(async file => _upload_to_targets(s3_connection, file, bucket_to_owner_keys_func)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return log.process(async file => _upload_to_targets(file, s3_conn_func, bucket_to_owner_keys_func)); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (err) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dbg.error('processing log file failed', log.file); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| throw err; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -51,12 +51,12 @@ async function export_logs_to_target(fs_context, s3_connection, bucket_to_owner_ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * This function gets a persistent log file, will go over it's entries one by one, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * and will upload the entry to the target_bucket using the provided s3 connection | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * in order to know which user to use to upload to each bucket we will need to provide bucket_to_owner_keys_func | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {AWS.S3} s3_connection | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {function(Object): import("@aws-sdk/client-s3").S3} s3_conn_func | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {import('../util/persistent_logger').LogFile} log_file | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @param {function} bucket_to_owner_keys_func | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| * @returns {Promise<Boolean>} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async function _upload_to_targets(s3_connection, log_file, bucket_to_owner_keys_func) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| async function _upload_to_targets(log_file, s3_conn_func, bucket_to_owner_keys_func) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const bucket_streams = {}; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const promises = []; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -67,33 +67,44 @@ async function _upload_to_targets(s3_connection, log_file, bucket_to_owner_keys_ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const target_bucket = log_entry.log_bucket; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const log_prefix = log_entry.log_prefix; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const source_bucket = log_entry.source_bucket; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!bucket_streams[source_bucket + BUCKET_NAME_DEL + target_bucket]) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let upload_stream = bucket_streams[source_bucket + BUCKET_NAME_DEL + target_bucket]; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!upload_stream) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /* new stream is needed for each target bucket, but also for each source bucket | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| - as mulitple buckets can't be written to the same object */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const date = new Date(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const upload_stream = new stream.PassThrough(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| upload_stream = new stream.PassThrough(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| let access_keys; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| access_keys = await bucket_to_owner_keys_func(target_bucket); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } catch (err) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dbg.warn('Error when trying to resolve bucket keys', err); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (err.rpc_code === 'NO_SUCH_BUCKET') return; // If the log_bucket doesn't exist any more - nowhere to upload - just skip | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| s3_connection.config.credentials = new AWS.Credentials(access_keys[0].access_key, access_keys[0].secret_key); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| //target bucket can be in either cloud or an NC noobaa bucket | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| //use the s3_conn_func CB to get suitable connection | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const s3_connection = s3_conn_func(access_keys); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| promises.push(sem.surround(() => P.retry({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removing retries, will it impact the bucket logging? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't know how well the retry will behave with the v3 Upload. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| attempts: 3, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| delay_ms: 1000, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| func: () => s3_connection.upload({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const upload = new Upload({ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| client: s3_connection, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| params: { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Bucket: target_bucket, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Body: upload_stream, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }).promise() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }))); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Body: upload_stream | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
86
to
+94
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Possible S3 key collision across different source buckets. Include source bucket in the key. Two different source buckets targeting the same bucket within the same millisecond will compute the same Apply: - const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex');
+ const sha = crypto.createHash('sha512')
+ .update(`${target_bucket}:${source_bucket}:${date.getTime()}`)
+ .digest('hex');
@@
- Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`,
+ Key: `${log_prefix}${format_aws_date(date)}-${source_bucket}-${sha.slice(0, 16).toUpperCase()}`,📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| queueSize: 1, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+88
to
+96
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid deadlock when waiting for 'drain' if Upload fails. If the Upload stops consuming (e.g., network error), const upload = new Upload({
client: s3_connection,
params: {
Bucket: target_bucket,
Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`,
Body: upload_stream
},
- queueSize: 1,
+ queueSize: 1,
});
bucket_streams[source_bucket + BUCKET_NAME_DEL + target_bucket] = upload_stream;
- promises.push(upload.done());
+ const upload_promise = upload.done();
+ // If upload fails, unblock writers waiting on 'drain'
+ upload_promise.catch(err => upload_stream.destroy(err));
+ promises.push(upload_promise);
@@
- if (!can_write) {
- dbg.warn(`waiting for target bucket: ${target_bucket} to drain`);
- await once(upload_stream, 'drain');
- dbg.warn(`target bucket: ${target_bucket} drained, resume writing`);
- }
+ if (!can_write) {
+ dbg.warn(`waiting for target bucket: ${target_bucket} to drain`);
+ await Promise.race([
+ once(upload_stream, 'drain'),
+ // Fail fast if the upload errors and stops consuming
+ once(upload_stream, 'error').then(([err]) => Promise.reject(err)),
+ ]);
+ dbg.warn(`target bucket: ${target_bucket} drained, resume writing`);
+ }Also applies to: 99-99, 103-107 🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| bucket_streams[source_bucket + BUCKET_NAME_DEL + target_bucket] = upload_stream; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| promises.push(upload.done()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dbg.log2(`uploading entry: ${entry} to target bucket: ${target_bucket}`); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| bucket_streams[source_bucket + BUCKET_NAME_DEL + target_bucket].write(entry + '\n'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| const can_write = upload_stream.write(entry + '\n'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!can_write) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dbg.warn(`waiting for target bucket: ${target_bucket} to drain`); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await once(upload_stream, 'drain'); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| dbg.warn(`target bucket: ${target_bucket} drained, resume writing`); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Object.values(bucket_streams).forEach(st => st.end()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await Promise.all(promises); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Fix TLS mismatch and harden credentials handling
endpointis HTTPS whiletls: falseforces HTTP; this mismatch can cause connection issues. Also, add a guard for missing keys and pass sessionToken when present. A small retry bound is prudent.Apply:
function noobaa_con_func(credentials) { const endpoint = `https://127.0.0.1:${config.ENDPOINT_SSL_PORT}`; - return new S3({ - endpoint, - forcePathStyle: true, - tls: false, - region: config.DEFAULT_REGION, - requestHandler: noobaa_s3_client.get_requestHandler_with_suitable_agent(endpoint), - credentials: { - accessKeyId: credentials[0].access_key, - secretAccessKey: credentials[0].secret_key - } - }); + const k = credentials?.[0]; + if (!k?.access_key || !k?.secret_key) { + throw new Error('Missing bucket owner access keys'); + } + return new S3({ + endpoint, + forcePathStyle: true, + region: config.DEFAULT_REGION, + requestHandler: noobaa_s3_client.get_requestHandler_with_suitable_agent(endpoint), + credentials: { + accessKeyId: k.access_key, + secretAccessKey: k.secret_key, + sessionToken: k.session_token + }, + maxAttempts: 3, + }); }🤖 Prompt for AI Agents