1+ import type { ResourceRelease } from '@matrixai/resources' ;
2+ import type {
3+ LockBox ,
4+ MultiLockRequest as AsyncLocksMultiLockRequest ,
5+ } from '@matrixai/async-locks' ;
16import type DB from './DB' ;
27import type {
8+ ToString ,
39 KeyPath ,
410 LevelPath ,
511 DBIteratorOptions ,
612 DBClearOptions ,
713 DBCountOptions ,
14+ MultiLockRequest ,
815} from './types' ;
916import type {
1017 RocksDBTransaction ,
@@ -13,6 +20,7 @@ import type {
1320} from './rocksdb/types' ;
1421import Logger from '@matrixai/logger' ;
1522import { CreateDestroy , ready } from '@matrixai/async-init/dist/CreateDestroy' ;
23+ import { RWLockWriter } from '@matrixai/async-locks' ;
1624import DBIterator from './DBIterator' ;
1725import { rocksdbP } from './rocksdb' ;
1826import * as utils from './utils' ;
@@ -21,37 +29,44 @@ import * as errors from './errors';
2129interface DBTransaction extends CreateDestroy { }
2230@CreateDestroy ( )
2331class DBTransaction {
32+ public readonly id : number ;
33+
2434 protected _db : DB ;
2535 protected logger : Logger ;
26-
36+ protected lockBox : LockBox < RWLockWriter > ;
37+ protected _locks : Map <
38+ string ,
39+ {
40+ lock : RWLockWriter ;
41+ type : 'read' | 'write' ;
42+ release : ResourceRelease ;
43+ }
44+ > = new Map ( ) ;
2745 protected _options : RocksDBTransactionOptions ;
2846 protected _transaction : RocksDBTransaction ;
29- protected _id : number ;
3047 protected _snapshot : RocksDBTransactionSnapshot ;
31-
48+ protected _iteratorRefs : Set < DBIterator < any , any > > = new Set ( ) ;
3249 protected _callbacksSuccess : Array < ( ) => any > = [ ] ;
3350 protected _callbacksFailure : Array < ( e ?: Error ) => any > = [ ] ;
3451 protected _callbacksFinally : Array < ( e ?: Error ) => any > = [ ] ;
3552 protected _committed : boolean = false ;
3653 protected _rollbacked : boolean = false ;
3754
38- /**
39- * References to iterators
40- */
41- protected _iteratorRefs : Set < DBIterator < any , any > > = new Set ( ) ;
42-
4355 public constructor ( {
4456 db,
57+ lockBox,
4558 logger,
4659 ...options
4760 } : {
4861 db : DB ;
62+ lockBox : LockBox < RWLockWriter > ;
4963 logger ?: Logger ;
5064 } & RocksDBTransactionOptions ) {
5165 logger = logger ?? new Logger ( this . constructor . name ) ;
5266 logger . debug ( `Constructing ${ this . constructor . name } ` ) ;
5367 this . logger = logger ;
5468 this . _db = db ;
69+ this . lockBox = lockBox ;
5570 const options_ = {
5671 ...options ,
5772 // Transactions should be synchronous
@@ -61,21 +76,24 @@ class DBTransaction {
6176 this . _options = options_ ;
6277 this . _transaction = rocksdbP . transactionInit ( db . db , options_ ) ;
6378 db . transactionRefs . add ( this ) ;
64- this . _id = rocksdbP . transactionId ( this . _transaction ) ;
65- logger . debug ( `Constructed ${ this . constructor . name } ${ this . _id } ` ) ;
79+ this . id = rocksdbP . transactionId ( this . _transaction ) ;
80+ logger . debug ( `Constructed ${ this . constructor . name } ${ this . id } ` ) ;
6681 }
6782
6883 /**
6984 * Destroy the transaction
7085 * This cannot be called until the transaction is committed or rollbacked
7186 */
7287 public async destroy ( ) {
73- this . logger . debug ( `Destroying ${ this . constructor . name } ${ this . _id } ` ) ;
74- this . _db . transactionRefs . delete ( this ) ;
88+ this . logger . debug ( `Destroying ${ this . constructor . name } ${ this . id } ` ) ;
7589 if ( ! this . _committed && ! this . _rollbacked ) {
7690 throw new errors . ErrorDBTransactionNotCommittedNorRollbacked ( ) ;
7791 }
78- this . logger . debug ( `Destroyed ${ this . constructor . name } ${ this . _id } ` ) ;
92+ this . _db . transactionRefs . delete ( this ) ;
93+ // Unlock all locked keys in reverse
94+ const lockedKeys = [ ...this . _locks . keys ( ) ] . reverse ( ) ;
95+ await this . unlock ( ...lockedKeys ) ;
96+ this . logger . debug ( `Destroyed ${ this . constructor . name } ${ this . id } ` ) ;
7997 }
8098
8199 get db ( ) : Readonly < DB > {
@@ -86,17 +104,6 @@ class DBTransaction {
86104 return this . _transaction ;
87105 }
88106
89- get id ( ) : number {
90- return this . _id ;
91- }
92-
93- /**
94- * @internal
95- */
96- get iteratorRefs ( ) : Readonly < Set < DBIterator < any , any > > > {
97- return this . _iteratorRefs ;
98- }
99-
100107 get callbacksSuccess ( ) : Readonly < Array < ( ) => any > > {
101108 return this . _callbacksSuccess ;
102109 }
@@ -117,6 +124,98 @@ class DBTransaction {
117124 return this . _rollbacked ;
118125 }
119126
127+ get locks ( ) : ReadonlyMap <
128+ string ,
129+ {
130+ lock : RWLockWriter ;
131+ type : 'read' | 'write' ;
132+ release : ResourceRelease ;
133+ }
134+ > {
135+ return this . _locks ;
136+ }
137+
138+ /**
139+ * @internal
140+ */
141+ get iteratorRefs ( ) : Readonly < Set < DBIterator < any , any > > > {
142+ return this . _iteratorRefs ;
143+ }
144+
145+ /**
146+ * Lock a sequence of lock requests
147+ * If the lock request doesn't specify, it
148+ * defaults to using `RWLockWriter` with `write` type
149+ * Keys are locked in string sorted order
150+ * Even though keys can be arbitrary strings, by convention, you should use
151+ * keys that correspond to keys in the database
152+ * Locking with the same key is idempotent therefore lock re-entrancy is enabled
153+ * Keys are automatically unlocked in reverse sorted order
154+ * when the transaction is destroyed
155+ * There is no support for lock upgrading or downgrading
156+ * There is no deadlock detection
157+ */
158+ public async lock (
159+ ...requests : Array < MultiLockRequest | string >
160+ ) : Promise < void > {
161+ const requests_ : Array < AsyncLocksMultiLockRequest < RWLockWriter > > = [ ] ;
162+ for ( const request of requests ) {
163+ if ( Array . isArray ( request ) ) {
164+ const [ key , ...lockingParams ] = request ;
165+ const key_ = key . toString ( ) ;
166+ const lock = this . _locks . get ( key_ ) ;
167+ // Default the lock type to `write`
168+ const lockType = ( lockingParams [ 0 ] = lockingParams [ 0 ] ?? 'write' ) ;
169+ if ( lock == null ) {
170+ requests_ . push ( [ key_ , RWLockWriter , ...lockingParams ] ) ;
171+ } else if ( lock . type !== lockType ) {
172+ throw new errors . ErrorDBTransactionLockType ( ) ;
173+ }
174+ } else {
175+ const key_ = request . toString ( ) ;
176+ const lock = this . _locks . get ( key_ ) ;
177+ if ( lock == null ) {
178+ // Default to using `RWLockWriter` write lock for just string keys
179+ requests_ . push ( [ key_ , RWLockWriter , 'write' ] ) ;
180+ } else if ( lock . type !== 'write' ) {
181+ throw new errors . ErrorDBTransactionLockType ( ) ;
182+ }
183+ }
184+ }
185+ if ( requests_ . length > 0 ) {
186+ // Duplicates are eliminated, and the returned acquisitions are sorted
187+ const lockAcquires = this . lockBox . lockMulti ( ...requests_ ) ;
188+ for ( const [ key , lockAcquire , ...lockingParams ] of lockAcquires ) {
189+ const [ lockRelease , lock ] = await lockAcquire ( ) ;
190+ // The `Map` will maintain insertion order
191+ // these must be unlocked in reverse order
192+ // when the transaction is destroyed
193+ this . _locks . set ( key as string , {
194+ lock : lock ! ,
195+ type : lockingParams [ 0 ] ! , // The `type` is defaulted to `write`
196+ release : lockRelease ,
197+ } ) ;
198+ }
199+ }
200+ }
201+
202+ /**
203+ * Unlock a sequence of lock keys
204+ * Unlocking will be done in the order of the keys
205+ * A transaction instance is only allowed to unlock keys that it previously
206+ * locked, all keys that are not part of the `this._locks` is ignored
207+ * Unlocking the same keys is idempotent
208+ */
209+ public async unlock ( ...keys : Array < ToString > ) : Promise < void > {
210+ for ( const key of keys ) {
211+ const key_ = key . toString ( ) ;
212+ const lock = this . _locks . get ( key_ ) ;
213+ if ( lock == null ) continue ;
214+ this . _locks . delete ( key_ ) ;
215+ await lock . release ( ) ;
216+ }
217+ }
218+
120219 public async get < T > (
121220 keyPath : KeyPath | string | Buffer ,
122221 raw ?: false ,
@@ -344,7 +443,7 @@ class DBTransaction {
344443 if ( this . _committed ) {
345444 return ;
346445 }
347- this . logger . debug ( `Committing ${ this . constructor . name } ${ this . _id } ` ) ;
446+ this . logger . debug ( `Committing ${ this . constructor . name } ${ this . id } ` ) ;
348447 for ( const iterator of this . _iteratorRefs ) {
349448 await iterator . destroy ( ) ;
350449 }
@@ -357,12 +456,14 @@ class DBTransaction {
357456 } catch ( e ) {
358457 if ( e . code === 'TRANSACTION_CONFLICT' ) {
359458 this . logger . debug (
360- `Failed Committing ${ this . constructor . name } ${ this . _id } due to ${ errors . ErrorDBTransactionConflict . name } ` ,
459+ `Failed Committing ${ this . constructor . name } ${ this . id } due to ${ errors . ErrorDBTransactionConflict . name } ` ,
361460 ) ;
362- throw new errors . ErrorDBTransactionConflict ( undefined , { cause : e } ) ;
461+ throw new errors . ErrorDBTransactionConflict ( undefined , {
462+ cause : e ,
463+ } ) ;
363464 } else {
364465 this . logger . debug (
365- `Failed Committing ${ this . constructor . name } ${ this . _id } due to ${ e . message } ` ,
466+ `Failed Committing ${ this . constructor . name } ${ this . id } due to ${ e . message } ` ,
366467 ) ;
367468 throw e ;
368469 }
@@ -376,7 +477,7 @@ class DBTransaction {
376477 }
377478 }
378479 await this . destroy ( ) ;
379- this . logger . debug ( `Committed ${ this . constructor . name } ${ this . _id } ` ) ;
480+ this . logger . debug ( `Committed ${ this . constructor . name } ${ this . id } ` ) ;
380481 }
381482
382483 @ready ( new errors . ErrorDBTransactionDestroyed ( ) )
@@ -387,7 +488,7 @@ class DBTransaction {
387488 if ( this . _rollbacked ) {
388489 return ;
389490 }
390- this . logger . debug ( `Rollbacking ${ this . constructor . name } ${ this . _id } ` ) ;
491+ this . logger . debug ( `Rollbacking ${ this . constructor . name } ${ this . id } ` ) ;
391492 for ( const iterator of this . _iteratorRefs ) {
392493 await iterator . destroy ( ) ;
393494 }
@@ -405,7 +506,20 @@ class DBTransaction {
405506 }
406507 }
407508 await this . destroy ( ) ;
408- this . logger . debug ( `Rollbacked ${ this . constructor . name } ${ this . _id } ` ) ;
509+ this . logger . debug ( `Rollbacked ${ this . constructor . name } ${ this . id } ` ) ;
510+ }
511+
512+ /**
513+ * Set the snapshot manually
514+ * This ensures that consistent reads and writes start
515+ * after this method is executed
516+ * This is idempotent
517+ * Note that normally snapshots are set lazily upon the first
518+ * transaction db operation
519+ */
520+ @ready ( new errors . ErrorDBTransactionDestroyed ( ) )
521+ public setSnapshot ( ) : void {
522+ this . setupSnapshot ( ) ;
409523 }
410524
411525 /**
0 commit comments