@@ -20,7 +20,7 @@ import type {
2020} from './native/types' ;
2121import Logger from '@matrixai/logger' ;
2222import { CreateDestroy , ready } from '@matrixai/async-init/dist/CreateDestroy' ;
23- import { RWLockWriter } from '@matrixai/async-locks' ;
23+ import { Lock , RWLockWriter } from '@matrixai/async-locks' ;
2424import DBIterator from './DBIterator' ;
2525import { rocksdbP } from './native' ;
2626import * as utils from './utils' ;
@@ -49,8 +49,11 @@ class DBTransaction {
4949 protected _callbacksSuccess : Array < ( ) => any > = [ ] ;
5050 protected _callbacksFailure : Array < ( e ?: Error ) => any > = [ ] ;
5151 protected _callbacksFinally : Array < ( e ?: Error ) => any > = [ ] ;
52+ protected _committing : boolean = false ;
5253 protected _committed : boolean = false ;
54+ protected _rollbacking : boolean = false ;
5355 protected _rollbacked : boolean = false ;
56+ protected commitOrRollbackLock : Lock = new Lock ( ) ;
5457
5558 public constructor ( {
5659 db,
@@ -86,9 +89,12 @@ class DBTransaction {
8689 */
8790 public async destroy ( ) {
8891 this . logger . debug ( `Destroying ${ this . constructor . name } ${ this . id } ` ) ;
89- if ( ! this . _committed && ! this . _rollbacked ) {
92+ if ( ! this . _committing && ! this . _rollbacking ) {
9093 throw new errors . ErrorDBTransactionNotCommittedNorRollbacked ( ) ;
9194 }
95+ // Wait for commit or rollback to finish
96+ // this then allows the destruction to proceed
97+ await this . commitOrRollbackLock . waitForUnlock ( ) ;
9298 this . _db . transactionRefs . delete ( this ) ;
9399 // Unlock all locked keys in reverse
94100 const lockedKeys = [ ...this . _locks . keys ( ) ] . reverse ( ) ;
@@ -116,10 +122,30 @@ class DBTransaction {
116122 return this . _callbacksFinally ;
117123 }
118124
125+ /**
126+ * Indicates when `this.commit` is first called
127+ */
128+ get committing ( ) : boolean {
129+ return this . _committing ;
130+ }
131+
132+ /**
133+ * Indicates when the transaction is committed
134+ */
119135 get committed ( ) : boolean {
120136 return this . _committed ;
121137 }
122138
139+ /**
140+ * Indicates when `this.rollback` is first called
141+ */
142+ get rollbacking ( ) : boolean {
143+ return this . _rollbacking ;
144+ }
145+
146+ /**
147+ * Indicates when the transaction is rollbacked
148+ */
123149 get rollbacked ( ) : boolean {
124150 return this . _rollbacked ;
125151 }
@@ -437,75 +463,79 @@ class DBTransaction {
437463
438464 @ready ( new errors . ErrorDBTransactionDestroyed ( ) )
439465 public async commit ( ) : Promise < void > {
440- if ( this . _rollbacked ) {
466+ if ( this . _rollbacking ) {
441467 throw new errors . ErrorDBTransactionRollbacked ( ) ;
442468 }
443- if ( this . _committed ) {
469+ if ( this . _committing ) {
444470 return ;
445471 }
472+ this . _committing = true ;
446473 this . logger . debug ( `Committing ${ this . constructor . name } ${ this . id } ` ) ;
447- for ( const iterator of this . _iteratorRefs ) {
448- await iterator . destroy ( ) ;
449- }
450- this . _committed = true ;
451- try {
474+ await this . commitOrRollbackLock . withF ( async ( ) => {
475+ for ( const iterator of this . _iteratorRefs ) {
476+ await iterator . destroy ( ) ;
477+ }
452478 try {
453- // If this fails, the `DBTransaction` is still considered committed
454- // it must be destroyed, it cannot be reused
455- await rocksdbP . transactionCommit ( this . _transaction ) ;
456- } catch ( e ) {
457- if ( e . code === 'TRANSACTION_CONFLICT' ) {
458- this . logger . debug (
459- `Failed Committing ${ this . constructor . name } ${ this . id } due to ${ errors . ErrorDBTransactionConflict . name } ` ,
460- ) ;
461- throw new errors . ErrorDBTransactionConflict ( undefined , {
462- cause : e ,
463- } ) ;
464- } else {
465- this . logger . debug (
466- `Failed Committing ${ this . constructor . name } ${ this . id } due to ${ e . message } ` ,
467- ) ;
468- throw e ;
479+ try {
480+ // If this fails, the `DBTransaction` is still considered committed
481+ // it must be destroyed, it cannot be reused
482+ await rocksdbP . transactionCommit ( this . _transaction ) ;
483+ } catch ( e ) {
484+ if ( e . code === 'TRANSACTION_CONFLICT' ) {
485+ this . logger . debug (
486+ `Failed Committing ${ this . constructor . name } ${ this . id } due to ${ errors . ErrorDBTransactionConflict . name } ` ,
487+ ) ;
488+ throw new errors . ErrorDBTransactionConflict ( undefined , {
489+ cause : e ,
490+ } ) ;
491+ } else {
492+ this . logger . debug (
493+ `Failed Committing ${ this . constructor . name } ${ this . id } due to ${ e . message } ` ,
494+ ) ;
495+ throw e ;
496+ }
497+ }
498+ for ( const f of this . _callbacksSuccess ) {
499+ await f ( ) ;
500+ }
501+ } finally {
502+ for ( const f of this . _callbacksFinally ) {
503+ await f ( ) ;
469504 }
470505 }
471- for ( const f of this . _callbacksSuccess ) {
472- await f ( ) ;
473- }
474- } finally {
475- for ( const f of this . _callbacksFinally ) {
476- await f ( ) ;
477- }
478- }
479- await this . destroy ( ) ;
506+ this . _committed = true ;
507+ } ) ;
480508 this . logger . debug ( `Committed ${ this . constructor . name } ${ this . id } ` ) ;
481509 }
482510
483511 @ready ( new errors . ErrorDBTransactionDestroyed ( ) )
484512 public async rollback ( e ?: Error ) : Promise < void > {
485- if ( this . _committed ) {
513+ if ( this . _committing ) {
486514 throw new errors . ErrorDBTransactionCommitted ( ) ;
487515 }
488- if ( this . _rollbacked ) {
516+ if ( this . _rollbacking ) {
489517 return ;
490518 }
519+ this . _rollbacking = true ;
491520 this . logger . debug ( `Rollbacking ${ this . constructor . name } ${ this . id } ` ) ;
492- for ( const iterator of this . _iteratorRefs ) {
493- await iterator . destroy ( ) ;
494- }
495- this . _rollbacked = true ;
496- try {
497- // If this fails, the `DBTransaction` is still considered rollbacked
498- // it must be destroyed, it cannot be reused
499- await rocksdbP . transactionRollback ( this . _transaction ) ;
500- for ( const f of this . _callbacksFailure ) {
501- await f ( e ) ;
521+ await this . commitOrRollbackLock . withF ( async ( ) => {
522+ for ( const iterator of this . _iteratorRefs ) {
523+ await iterator . destroy ( ) ;
502524 }
503- } finally {
504- for ( const f of this . _callbacksFinally ) {
505- await f ( e ) ;
525+ try {
526+ // If this fails, the `DBTransaction` is still considered rollbacked
527+ // it must be destroyed, it cannot be reused
528+ await rocksdbP . transactionRollback ( this . _transaction ) ;
529+ for ( const f of this . _callbacksFailure ) {
530+ await f ( e ) ;
531+ }
532+ } finally {
533+ for ( const f of this . _callbacksFinally ) {
534+ await f ( e ) ;
535+ }
506536 }
507- }
508- await this . destroy ( ) ;
537+ this . _rollbacked = true ;
538+ } ) ;
509539 this . logger . debug ( `Rollbacked ${ this . constructor . name } ${ this . id } ` ) ;
510540 }
511541
0 commit comments