-
Notifications
You must be signed in to change notification settings - Fork 89
bucket logging - adapt s3 conn to aws sdk v3 (dfs 3887) #9210
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughMigrates S3 logging/upload code from AWS SDK v2 to v3: moves Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant NSFS as manage_nsfs_logging.export_bucket_logging
participant BU as bucket_logs_utils.export_logs_to_target
participant S3F as s3_conn_func (factory)
participant S3 as S3 Client (v3)
participant AWS as S3 Service
NSFS->>BU: export_logs_to_target(fs_ctx, noobaa_con_func, get_bucket_owner_keys)
loop per source log file
BU->>BU: resolve target buckets
loop per target bucket
BU->>BU: bucket_to_owner_keys_func(bucket)
alt access_keys resolved
BU->>S3F: s3_conn_func(access_keys)
S3F-->>BU: S3 client (v3)
BU->>S3: Upload(params {Bucket,Key}, Body: PassThrough)
S3->>AWS: multipart/streamed upload
AWS-->>S3: success
else NO_SUCH_BUCKET
BU->>BU: skip bucket
end
end
end
BU-->>NSFS: boolean success
sequenceDiagram
autonumber
participant BG as BucketLogUploader.scan
participant BU as export_logs_to_target
participant NB as this.noobaa_connection
BG->>BU: export_logs_to_target(fs_ctx, () => NB, get_bucket_owner_keys)
note right of BU: BU invokes factory per-bucket to obtain S3 client
BU-->>BG: boolean success
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Pre-merge checks (3 passed)✅ Passed checks (3 passed)
✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
8ffaa4a to
3677607
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/util/bucket_logs_utils.js (1)
75-84: Handle non-NO_SUCH_BUCKET errors from owner-keys resolutionOn unexpected errors, code falls through with undefined access_keys and calls s3_conn_func(access_keys). This will likely throw later in a less actionable place. Short-circuit per-entry or propagate.
try { access_keys = await bucket_to_owner_keys_func(target_bucket); } catch (err) { dbg.warn('Error when trying to resolve bucket keys', err); if (err.rpc_code === 'NO_SUCH_BUCKET') return; // skip this bucket - } + // For other errors, skip this entry (or rethrow to fail the file) + return; + }
🧹 Nitpick comments (7)
src/util/bucket_logs_utils.js (4)
16-16: Remove stale semaphore commentLeftover commented code suggests obsolete concurrency approach. Delete to avoid confusion.
-//const sem = new semaphore.Semaphore(config.BUCKET_LOG_CONCURRENCY);
53-58: Fix JSDoc parameter order mismatchThe tags list s3_conn_func before log_file, but the function signature is (log_file, s3_conn_func, bucket_to_owner_keys_func).
- * @param {function(Object): import("@aws-sdk/client-s3").S3} s3_conn_func - * @param {import('../util/persistent_logger').LogFile} log_file + * @param {import('../util/persistent_logger').LogFile} log_file + * @param {function(Object): import("@aws-sdk/client-s3").S3} s3_conn_func
93-94: Make Upload concurrency configurablequeueSize: 1 serializes part uploads and may be slow. Consider a config knob; default to library’s concurrency when unset.
- queueSize: 1, + queueSize: config.BUCKET_LOG_UPLOAD_QUEUE_SIZE ?? undefined,
71-71: Typo: “mulitple” → “multiple”Minor readability fix in comment.
- - as mulitple buckets can't be written to the same object */ + - as multiple buckets can't be written to the same object */src/server/bg_services/bucket_logs_upload.js (1)
45-45: Fix log message typosMinor polish for operator logs.
- dbg.log0('Logs were uploaded succesfully to their target buckets'); + dbg.log0('Logs were uploaded successfully to their target buckets'); @@ - dbg.log0('Scaning bucket logging objects in ', BUCKET_LOGS_PATH); + dbg.log0('Scanning bucket logging objects in ', BUCKET_LOGS_PATH); @@ - * Uploades the log object to the log bucket + * Uploads the log object to the log bucket @@ - * name of the bucket it uploades the log objects to the log bucket + * name of the bucket it uploads the log objects to the log bucketAlso applies to: 56-56, 123-123, 160-160
src/manage_nsfs/manage_nsfs_logging.js (2)
27-32: Clarify JSDoc types and wordingDocument the expected credentials shape and the return type for better DX.
Apply:
-/** - * A CB for bucket_logs_utils to get a v3 S3 connection. - * @param {Object} credentials for the target bucket - * @returns {S3} An S3 connection - */ +/** + * Connection factory for bucket_logs_utils to get a v3 S3 client. + * @param {Array<{access_key: string, secret_key: string, session_token?: string}>} credentials - target bucket owner keys + * @returns {import('@aws-sdk/client-s3').S3} S3 client + */
33-47: Optional: cache S3 clients per key to reuse socketsIf
export_logs_to_targetcalls this repeatedly, caching avoids excess client instantiation and improves connection reuse.Example (outside diff range):
// module scope const s3ClientCache = new Map(); // key: `${accessKey}:${secretKey}:${config.ENDPOINT_SSL_PORT}` function noobaa_con_func(credentials) { const k = credentials?.[0]; const cacheKey = `${k.access_key}:${k.secret_key}:${config.ENDPOINT_SSL_PORT}`; const cached = s3ClientCache.get(cacheKey); if (cached) return cached; const client = new S3({ /* ...as configured above... */ }); s3ClientCache.set(cacheKey, client); return client; }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
⛔ Files ignored due to path filters (1)
package-lock.jsonis excluded by!**/package-lock.json
📒 Files selected for processing (4)
package.json(1 hunks)src/manage_nsfs/manage_nsfs_logging.js(2 hunks)src/server/bg_services/bucket_logs_upload.js(2 hunks)src/util/bucket_logs_utils.js(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
src/manage_nsfs/manage_nsfs_logging.js (4)
src/server/bg_services/bucket_logs_upload.js (3)
require(10-10)require(11-11)config(7-7)src/util/bucket_logs_utils.js (4)
require(9-9)require(10-10)require(14-14)config(5-5)src/sdk/object_sdk.js (2)
noobaa_s3_client(28-28)config(10-10)src/server/system_services/bucket_server.js (2)
noobaa_s3_client(41-41)config(14-14)
src/server/bg_services/bucket_logs_upload.js (2)
src/manage_nsfs/manage_nsfs_logging.js (1)
success(19-19)src/cmd/manage_nsfs.js (1)
fs_context(73-73)
src/util/bucket_logs_utils.js (1)
src/server/bg_services/bucket_logs_upload.js (5)
require(10-10)require(11-11)config(7-7)BUCKET_NAME_DEL(21-21)dbg(5-5)
🔇 Additional comments (6)
package.json (1)
79-79: Move lib-storage to runtime deps: OK; verify packaging impactChange looks correct and versions are aligned with other @AWS-SDK modules. If you distribute with pkg, confirm lib-storage’s files are picked up (dynamic imports can be tricky with packagers).
src/util/bucket_logs_utils.js (1)
25-33: API change to s3_conn_func: double-check all call sitesSignature switch to a factory is good. Ensure all callers now pass a function (not an S3 client), including bg_services and NSFS paths.
src/server/bg_services/bucket_logs_upload.js (2)
43-44: s3_conn_func ignores per-bucket credentials — confirm intentPassing () => this.noobaa_connection disregards access_keys resolved by export_logs_to_target. If the uploader is expected to use bucket-owner creds, supply a factory that builds a client from those keys. If admin creds are intended, this is fine.
151-151: Confirm v3-compatible putObject usageAssuming cloud_utils now returns a v3-compatible wrapper, await noobaa_con.putObject(params) is correct. If it returns a raw S3Client, use client.send(new PutObjectCommand(params)).
src/manage_nsfs/manage_nsfs_logging.js (2)
4-5: AWS SDK v3 import and custom request handler: LGTMSwitch to
{ S3 }and using the NooBaa request handler aligns with the v3 migration plan.
19-19: No changes needed: all export_logs_to_target calls pass thunks
Both call sites now supply a connection factory: src/server/bg_services/bucket_logs_upload.js uses an arrow function, and src/manage_nsfs/manage_nsfs_logging.js uses noobaa_con_func.
| function noobaa_con_func(credentials) { | ||
| const endpoint = `https://127.0.0.1:${config.ENDPOINT_SSL_PORT}`; | ||
|
|
||
| return new S3({ | ||
| endpoint, | ||
| forcePathStyle: true, | ||
| tls: false, | ||
| region: config.DEFAULT_REGION, | ||
| requestHandler: noobaa_s3_client.get_requestHandler_with_suitable_agent(endpoint), | ||
| credentials: { | ||
| accessKeyId: credentials[0].access_key, | ||
| secretAccessKey: credentials[0].secret_key | ||
| } | ||
| }); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Fix TLS mismatch and harden credentials handling
endpoint is HTTPS while tls: false forces HTTP; this mismatch can cause connection issues. Also, add a guard for missing keys and pass sessionToken when present. A small retry bound is prudent.
Apply:
function noobaa_con_func(credentials) {
const endpoint = `https://127.0.0.1:${config.ENDPOINT_SSL_PORT}`;
- return new S3({
- endpoint,
- forcePathStyle: true,
- tls: false,
- region: config.DEFAULT_REGION,
- requestHandler: noobaa_s3_client.get_requestHandler_with_suitable_agent(endpoint),
- credentials: {
- accessKeyId: credentials[0].access_key,
- secretAccessKey: credentials[0].secret_key
- }
- });
+ const k = credentials?.[0];
+ if (!k?.access_key || !k?.secret_key) {
+ throw new Error('Missing bucket owner access keys');
+ }
+ return new S3({
+ endpoint,
+ forcePathStyle: true,
+ region: config.DEFAULT_REGION,
+ requestHandler: noobaa_s3_client.get_requestHandler_with_suitable_agent(endpoint),
+ credentials: {
+ accessKeyId: k.access_key,
+ secretAccessKey: k.secret_key,
+ sessionToken: k.session_token
+ },
+ maxAttempts: 3,
+ });
}Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In src/manage_nsfs/manage_nsfs_logging.js around lines 33 to 47, the S3 client
is created with an HTTPS endpoint but tls:false and unguarded credential access;
change tls to true to match the HTTPS endpoint, validate credentials[0] exists
and has access_key and secret_key before using them (throw or return an error if
missing), include credentials[0].session_token as sessionToken when present, and
wrap the S3 creation/initial request in a small retry loop with a tight
maxRetries (e.g., 2-3) and backoff to handle transient failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (1)
src/util/bucket_logs_utils.js (1)
98-100: Addressed: start uploads immediately to honor backpressure.Pushing
upload.done()intopromisesat creation time resolves the earlier concern about unbounded buffering. Nice.
🧹 Nitpick comments (4)
src/util/bucket_logs_utils.js (4)
95-96: Queue size is very conservative; consider aligning with config.
queueSize: 1strictly serializes multipart uploads. You can safely dial this up to match previous semaphore-based concurrency.Example:
- queueSize: 1, + queueSize: Math.max(1, config.BUCKET_LOG_CONCURRENCY || 1),
103-107: Reduce log verbosity on backpressure drain messages.These will be frequent on large logs; warn-level is noisy.
Apply:
- dbg.warn(`waiting for target bucket: ${target_bucket} to drain`); + dbg.log1(`waiting for target bucket: ${target_bucket} to drain`); await once(upload_stream, 'drain'); - dbg.warn(`target bucket: ${target_bucket} drained, resume writing`); + dbg.log1(`target bucket: ${target_bucket} drained, resume writing`);
109-111: Option: capture all upload failures rather than short-circuiting on first.
Promise.allthrows on the first rejection, potentially hiding other failures. ConsiderallSettledand aggregate logging.Example:
- await Promise.all(promises); + const outcomes = await Promise.allSettled(promises); + for (const o of outcomes) if (o.status === 'rejected') dbg.error('upload failed', o.reason); + if (outcomes.some(o => o.status === 'rejected')) throw new Error('one or more uploads failed');
70-86: Optional: skip re-resolving keys for buckets known to be gone.Avoid repeated
NO_SUCH_BUCKETchecks by memoizing skips per log file.Add near the top of
_upload_to_targets:const skipped_buckets = new Set();And guard resolution:
if (skipped_buckets.has(target_bucket)) return; try { access_keys = await bucket_to_owner_keys_func(target_bucket); } catch (err) { if (err.rpc_code === 'NO_SUCH_BUCKET') { skipped_buckets.add(target_bucket); return; } throw err; // or handle per your policy }
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
package-lock.jsonis excluded by!**/package-lock.json
📒 Files selected for processing (4)
package.json(1 hunks)src/manage_nsfs/manage_nsfs_logging.js(2 hunks)src/server/bg_services/bucket_logs_upload.js(2 hunks)src/util/bucket_logs_utils.js(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- package.json
- src/server/bg_services/bucket_logs_upload.js
- src/manage_nsfs/manage_nsfs_logging.js
🧰 Additional context used
🧬 Code graph analysis (1)
src/util/bucket_logs_utils.js (2)
src/manage_nsfs/manage_nsfs_logging.js (4)
require(4-4)require(7-7)require(8-8)require(11-11)src/server/bg_services/bucket_logs_upload.js (2)
require(10-10)require(11-11)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: run-jest-unit-tests
- GitHub Check: run-package-lock-validation
- GitHub Check: Build Noobaa Image
🔇 Additional comments (3)
src/util/bucket_logs_utils.js (3)
14-16: Good v3 migration imports.Using
@aws-sdk/lib-storageandevents.onceis appropriate here.
78-86: Clarify behavior when keys resolution fails with other errors.If resolution throws a non-
NO_SUCH_BUCKETerror,access_keysremains undefined and the code proceeds. Confirms3_conn_func(undefined)is intentional and picks the correct default client; otherwise, bail out for unexpected errors.
30-31: All call sites pass factory functions Verified that bothexport_logs_to_targetinvocations use functions fors3_conn_funcandbucket_to_owner_keys_func—no direct S3 client instances remain.
| const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex'); | ||
| promises.push(sem.surround(() => P.retry({ | ||
| attempts: 3, | ||
| delay_ms: 1000, | ||
| func: () => s3_connection.upload({ | ||
|
|
||
| const upload = new Upload({ | ||
| client: s3_connection, | ||
| params: { | ||
| Bucket: target_bucket, | ||
| Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`, | ||
| Body: upload_stream, | ||
| }).promise() | ||
| }))); | ||
| Body: upload_stream | ||
| }, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possible S3 key collision across different source buckets. Include source bucket in the key.
Two different source buckets targeting the same bucket within the same millisecond will compute the same sha and object key. Include source_bucket in the hash and/or the key string.
Apply:
- const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex');
+ const sha = crypto.createHash('sha512')
+ .update(`${target_bucket}:${source_bucket}:${date.getTime()}`)
+ .digest('hex');
@@
- Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`,
+ Key: `${log_prefix}${format_aws_date(date)}-${source_bucket}-${sha.slice(0, 16).toUpperCase()}`,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex'); | |
| promises.push(sem.surround(() => P.retry({ | |
| attempts: 3, | |
| delay_ms: 1000, | |
| func: () => s3_connection.upload({ | |
| const upload = new Upload({ | |
| client: s3_connection, | |
| params: { | |
| Bucket: target_bucket, | |
| Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`, | |
| Body: upload_stream, | |
| }).promise() | |
| }))); | |
| Body: upload_stream | |
| }, | |
| const sha = crypto.createHash('sha512') | |
| .update(`${target_bucket}:${source_bucket}:${date.getTime()}`) | |
| .digest('hex'); | |
| const upload = new Upload({ | |
| client: s3_connection, | |
| params: { | |
| Bucket: target_bucket, | |
| Key: `${log_prefix}${format_aws_date(date)}-${source_bucket}-${sha.slice(0, 16).toUpperCase()}`, | |
| Body: upload_stream | |
| }, |
| //use the s3_conn_func CB to get suitable connection | ||
| const s3_connection = s3_conn_func(access_keys); | ||
| const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex'); | ||
| promises.push(sem.surround(() => P.retry({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removing retries, will it impact the bucket logging?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how well the retry will behave with the v3 Upload.
WCS we will re-process the log file in the next run.
Signed-off-by: Amit Prinz Setter <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (2)
src/util/bucket_logs_utils.js (2)
70-76: Allocate PassThrough only after target keys are resolved to avoid leaks.If
bucket_to_owner_keys_func()returns/throws (e.g., NO_SUCH_BUCKET), the early return leaves an unconsumed PassThrough allocated. Move creation after keys resolution and S3 client construction.- const date = new Date(); - upload_stream = new stream.PassThrough(); + const date = new Date(); let access_keys; try { access_keys = await bucket_to_owner_keys_func(target_bucket); } catch (err) { dbg.warn('Error when trying to resolve bucket keys', err); if (err.rpc_code === 'NO_SUCH_BUCKET') return; // If the log_bucket doesn't exist any more - nowhere to upload - just skip } //target bucket can be in either cloud or an NC noobaa bucket //use the s3_conn_func CB to get suitable connection const s3_connection = s3_conn_func(access_keys); + // Create the stream only after we know we can upload + upload_stream = new stream.PassThrough();Also applies to: 83-86
86-94: Prevent S3 key collisions across different source buckets. Include source bucket in hash/key.Two source buckets targeting the same destination within the same millisecond can collide. Include
source_bucketin the hash and key.- const sha = crypto.createHash('sha512').update(target_bucket + date.getTime()).digest('hex'); + const sha = crypto.createHash('sha512') + .update(`${target_bucket}:${source_bucket}:${date.getTime()}`) + .digest('hex'); @@ - Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`, + Key: `${log_prefix}${format_aws_date(date)}-${source_bucket}-${sha.slice(0, 16).toUpperCase()}`,
🧹 Nitpick comments (2)
src/util/bucket_logs_utils.js (2)
77-86: Skip entry when bucket owner keys cannot be resolved (non-NO_SUCH_BUCKET).Continuing with
access_keysundefined will likely fail later and abort the whole file. Safer to skip the entry.let access_keys; try { access_keys = await bucket_to_owner_keys_func(target_bucket); } catch (err) { dbg.warn('Error when trying to resolve bucket keys', err); if (err.rpc_code === 'NO_SUCH_BUCKET') return; // If the log_bucket doesn't exist any more - nowhere to upload - just skip + return; // skip this entry on other resolution errors } + if (!access_keys) { + dbg.warn('Skipping entry: unresolved bucket owner keys', { target_bucket }); + return; + }
95-96: Make Upload queue size configurable.Expose via config to tune throughput vs memory for different deployments.
- queueSize: 1, + queueSize: config.BUCKET_LOG_UPLOAD_QUEUE_SIZE || 1,
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
package-lock.jsonis excluded by!**/package-lock.json
📒 Files selected for processing (4)
package.json(1 hunks)src/manage_nsfs/manage_nsfs_logging.js(2 hunks)src/server/bg_services/bucket_logs_upload.js(2 hunks)src/util/bucket_logs_utils.js(4 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
- package.json
- src/server/bg_services/bucket_logs_upload.js
- src/manage_nsfs/manage_nsfs_logging.js
🧰 Additional context used
🧬 Code graph analysis (1)
src/util/bucket_logs_utils.js (3)
src/manage_nsfs/manage_nsfs_logging.js (4)
require(4-4)require(7-7)require(8-8)require(11-11)src/server/bg_services/bucket_logs_upload.js (2)
require(10-10)require(11-11)src/endpoint/s3/s3_bucket_logging.js (1)
log(146-158)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Build Noobaa Image
- GitHub Check: run-jest-unit-tests
- GitHub Check: run-package-lock-validation
🔇 Additional comments (1)
src/util/bucket_logs_utils.js (1)
99-107: Good: uploads start immediately and backpressure is honored.Starting
upload.done()right away attaches the consumer, and thedrainwait preserves backpressure. This fixes the buffering concern.
| const upload = new Upload({ | ||
| client: s3_connection, | ||
| params: { | ||
| Bucket: target_bucket, | ||
| Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`, | ||
| Body: upload_stream, | ||
| }).promise() | ||
| }))); | ||
| Body: upload_stream | ||
| }, | ||
| queueSize: 1, | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid deadlock when waiting for 'drain' if Upload fails.
If the Upload stops consuming (e.g., network error), drain may never fire and the writer hangs. Tie the stream to the upload promise and race drain with an error event.
const upload = new Upload({
client: s3_connection,
params: {
Bucket: target_bucket,
Key: `${log_prefix}${format_aws_date(date)}-${sha.slice(0, 16).toUpperCase()}`,
Body: upload_stream
},
- queueSize: 1,
+ queueSize: 1,
});
bucket_streams[source_bucket + BUCKET_NAME_DEL + target_bucket] = upload_stream;
- promises.push(upload.done());
+ const upload_promise = upload.done();
+ // If upload fails, unblock writers waiting on 'drain'
+ upload_promise.catch(err => upload_stream.destroy(err));
+ promises.push(upload_promise);
@@
- if (!can_write) {
- dbg.warn(`waiting for target bucket: ${target_bucket} to drain`);
- await once(upload_stream, 'drain');
- dbg.warn(`target bucket: ${target_bucket} drained, resume writing`);
- }
+ if (!can_write) {
+ dbg.warn(`waiting for target bucket: ${target_bucket} to drain`);
+ await Promise.race([
+ once(upload_stream, 'drain'),
+ // Fail fast if the upload errors and stops consuming
+ once(upload_stream, 'error').then(([err]) => Promise.reject(err)),
+ ]);
+ dbg.warn(`target bucket: ${target_bucket} drained, resume writing`);
+ }Also applies to: 99-99, 103-107
🤖 Prompt for AI Agents
In src/util/bucket_logs_utils.js around lines 88-96 (and also apply same fix at
lines ~99 and 103-107), the writable stream can hang waiting for 'drain' if the
S3 Upload stops consuming; modify the logic so the stream write waits on a
Promise.race between the stream 'drain' event and the Upload promise (or its
error), i.e., attach error handlers to the upload and convert the Upload to a
promise you can await, then when write returns false await
Promise.race([once(upload, 'error'), once(stream, 'drain'), uploadPromise]) so
that an upload error will reject and unblock the writer; ensure you remove
listeners after resolution to avoid leaks and propagate the upload error up.
Signed-off-by: Amit Prinz Setter <[email protected]>
[Backport into 5.20]bucket logging - adapt s3 conn to aws sdk v3 (dfs 3887) (#9210)
Describe the Problem
cloud_utils.js was adapted to use s3client from aws sdk v3.
However, bucket_logs_utils.js was not.
This commit adapts bucket_logs_utils.js to use new v3 s3client.
Explain the Changes
Issues: Fixed #xxx / Gap #xxx
Testing Instructions:
Summary by CodeRabbit
Refactor
Chores