@@ -9,12 +9,12 @@ const path = require('path');
99const { PersistentLogger } = require ( '../util/persistent_logger' ) ;
1010const { format_aws_date } = require ( '../util/time_utils' ) ;
1111const nsfs_schema_utils = require ( '../manage_nsfs/nsfs_schema_utils' ) ;
12- const semaphore = require ( '../util/semaphore' ) ;
1312const P = require ( '../util/promise' ) ;
1413const nb_native = require ( '../util/nb_native' ) ;
15- const AWS = require ( 'aws-sdk' ) ;
14+ const { Upload } = require ( "@aws-sdk/lib-storage" ) ;
15+ const { once } = require ( 'events' ) ;
1616
17- const sem = new semaphore . Semaphore ( config . BUCKET_LOG_CONCURRENCY ) ;
17+ // const sem = new semaphore.Semaphore(config.BUCKET_LOG_CONCURRENCY);
1818
1919// delimiter between bucket name and log object name.
2020// Assuming noobaa bucket name follows the s3 bucket
@@ -27,16 +27,16 @@ const BUCKET_NAME_DEL = "_";
2727 * This function will process the persistent log of bucket logging
2828 * and will upload the log files in using provided noobaa connection
2929 * @param {nb.NativeFSContext } fs_context
30- * @param {AWS .S3 } s3_connection
30+ * @param {function(Object): import("@aws-sdk/client-s3") .S3 } s3_conn_func
3131 * @param {function } bucket_to_owner_keys_func
3232 */
33- async function export_logs_to_target ( fs_context , s3_connection , bucket_to_owner_keys_func ) {
33+ async function export_logs_to_target ( fs_context , s3_conn_func , bucket_to_owner_keys_func ) {
3434 const entries = await nb_native ( ) . fs . readdir ( fs_context , config . PERSISTENT_BUCKET_LOG_DIR ) ;
3535 const results = await P . map_with_concurrency ( 5 , entries , async entry => {
3636 if ( ! entry . name . endsWith ( '.log' ) ) return ;
3737 const log = new PersistentLogger ( config . PERSISTENT_BUCKET_LOG_DIR , path . parse ( entry . name ) . name , { locking : 'EXCLUSIVE' } ) ;
3838 try {
39- return log . process ( async file => _upload_to_targets ( s3_connection , file , bucket_to_owner_keys_func ) ) ;
39+ return log . process ( async file => _upload_to_targets ( file , s3_conn_func , bucket_to_owner_keys_func ) ) ;
4040 } catch ( err ) {
4141 dbg . error ( 'processing log file failed' , log . file ) ;
4242 throw err ;
@@ -51,12 +51,12 @@ async function export_logs_to_target(fs_context, s3_connection, bucket_to_owner_
5151 * This function gets a persistent log file, will go over it's entries one by one,
5252 * and will upload the entry to the target_bucket using the provided s3 connection
5353 * in order to know which user to use to upload to each bucket we will need to provide bucket_to_owner_keys_func
54- * @param {AWS .S3 } s3_connection
54+ * @param {function(Object): import("@aws-sdk/client-s3") .S3 } s3_conn_func
5555 * @param {import('../util/persistent_logger').LogFile } log_file
5656 * @param {function } bucket_to_owner_keys_func
5757 * @returns {Promise<Boolean> }
5858 */
59- async function _upload_to_targets ( s3_connection , log_file , bucket_to_owner_keys_func ) {
59+ async function _upload_to_targets ( log_file , s3_conn_func , bucket_to_owner_keys_func ) {
6060 const bucket_streams = { } ;
6161 const promises = [ ] ;
6262 try {
@@ -67,33 +67,44 @@ async function _upload_to_targets(s3_connection, log_file, bucket_to_owner_keys_
6767 const target_bucket = log_entry . log_bucket ;
6868 const log_prefix = log_entry . log_prefix ;
6969 const source_bucket = log_entry . source_bucket ;
70- if ( ! bucket_streams [ source_bucket + BUCKET_NAME_DEL + target_bucket ] ) {
70+ let upload_stream = bucket_streams [ source_bucket + BUCKET_NAME_DEL + target_bucket ] ;
71+ if ( ! upload_stream ) {
7172 /* new stream is needed for each target bucket, but also for each source bucket
7273 - as mulitple buckets can't be written to the same object */
7374 const date = new Date ( ) ;
74- const upload_stream = new stream . PassThrough ( ) ;
75+ upload_stream = new stream . PassThrough ( ) ;
7576 let access_keys ;
7677 try {
7778 access_keys = await bucket_to_owner_keys_func ( target_bucket ) ;
7879 } catch ( err ) {
7980 dbg . warn ( 'Error when trying to resolve bucket keys' , err ) ;
8081 if ( err . rpc_code === 'NO_SUCH_BUCKET' ) return ; // If the log_bucket doesn't exist any more - nowhere to upload - just skip
8182 }
82- s3_connection . config . credentials = new AWS . Credentials ( access_keys [ 0 ] . access_key , access_keys [ 0 ] . secret_key ) ;
83+ //target bucket can be in either cloud or an NC noobaa bucket
84+ //use the s3_conn_func CB to get suitable connection
85+ const s3_connection = s3_conn_func ( access_keys ) ;
8386 const sha = crypto . createHash ( 'sha512' ) . update ( target_bucket + date . getTime ( ) ) . digest ( 'hex' ) ;
84- promises . push ( sem . surround ( ( ) => P . retry ( {
85- attempts : 3 ,
86- delay_ms : 1000 ,
87- func : ( ) => s3_connection . upload ( {
87+
88+ const upload = new Upload ( {
89+ client : s3_connection ,
90+ params : {
8891 Bucket : target_bucket ,
8992 Key : `${ log_prefix } ${ format_aws_date ( date ) } -${ sha . slice ( 0 , 16 ) . toUpperCase ( ) } ` ,
90- Body : upload_stream ,
91- } ) . promise ( )
92- } ) ) ) ;
93+ Body : upload_stream
94+ } ,
95+ queueSize : 1 ,
96+ } ) ;
97+
9398 bucket_streams [ source_bucket + BUCKET_NAME_DEL + target_bucket ] = upload_stream ;
99+ promises . push ( upload . done ( ) ) ;
94100 }
95101 dbg . log2 ( `uploading entry: ${ entry } to target bucket: ${ target_bucket } ` ) ;
96- bucket_streams [ source_bucket + BUCKET_NAME_DEL + target_bucket ] . write ( entry + '\n' ) ;
102+ const can_write = upload_stream . write ( entry + '\n' ) ;
103+ if ( ! can_write ) {
104+ dbg . warn ( `waiting for target bucket: ${ target_bucket } to drain` ) ;
105+ await once ( upload_stream , 'drain' ) ;
106+ dbg . warn ( `target bucket: ${ target_bucket } drained, resume writing` ) ;
107+ }
97108 } ) ;
98109 Object . values ( bucket_streams ) . forEach ( st => st . end ( ) ) ;
99110 await Promise . all ( promises ) ;
0 commit comments