3636import io .vertx .sqlclient .RowSet ;
3737import io .vertx .sqlclient .SqlConnection ;
3838import io .vertx .sqlclient .SqlResult ;
39- import io .vertx .sqlclient .Transaction ;
4039import io .vertx .sqlclient .Tuple ;
4140import io .vertx .sqlclient .spi .DatabaseMetadata ;
4241
42+ import static org .hibernate .reactive .util .impl .CompletionStages .failedFuture ;
4343import static org .hibernate .reactive .util .impl .CompletionStages .rethrow ;
44+ import static org .hibernate .reactive .util .impl .CompletionStages .supplyStage ;
4445import static org .hibernate .reactive .util .impl .CompletionStages .voidFuture ;
4546
4647/**
@@ -62,7 +63,11 @@ public class SqlClientConnection implements ReactiveConnection {
6263 private final SqlConnection connection ;
6364 // The context associated to the connection. We expect the connection to be executed in this context.
6465 private final ContextInternal connectionContext ;
65- private Transaction transaction ;
66+
67+ // We keep track of this because the close could be called multiple times if an error occurs, and this creates
68+ // several additional useless exception in the stacktrace
69+ private boolean closed = false ;
70+
6671
6772 SqlClientConnection (
6873 SqlConnection connection ,
@@ -362,52 +367,86 @@ private SqlConnection client() {
362367
363368 @ Override
364369 public CompletionStage <Void > beginTransaction () {
365- if ( transaction != null ) {
366- throw new IllegalStateException ( "Can't begin a new transaction as an active transaction is already associated to this connection" );
367- }
368- return connection .begin ()
369- .onSuccess ( tx -> LOG .tracef ( "Transaction started: %s" , tx ) )
370- .onFailure ( v -> LOG .errorf ( "Failed to start a transaction: %s" , transaction ) )
371- .toCompletionStage ()
372- .thenAccept ( this ::setTransaction );
370+ return validateLiveTransaction ( "starting a new transaction" )
371+ .thenCompose ( v -> connection .begin ()
372+ .onSuccess ( tx -> LOG .tracef ( "Transaction started: %s" , tx ) )
373+ .onFailure ( throwable -> LOG .errorf ( "Failed to start a transaction: %s" , throwable .getMessage () ) )
374+ .toCompletionStage ()
375+ )
376+ .thenCompose ( CompletionStages ::voidFuture );
373377 }
374378
375379 @ Override
376380 public CompletionStage <Void > commitTransaction () {
377- return transaction . commit ()
378- .onSuccess ( v -> LOG . tracef ( "Transaction committed: %s" , transaction ) )
379- .onFailure ( v -> LOG .errorf ( "Failed to commit transaction : %s" , transaction ) )
380- .toCompletionStage ( )
381- .whenComplete ( this :: clearTransaction );
381+ return connection . transaction ()
382+ .commit ( )
383+ .onSuccess ( v -> LOG .tracef ( "Transaction committed : %s" , connection . transaction () ) )
384+ .onFailure ( throwable -> LOG . errorf ( "Failed to commit transaction: %s" , throwable . getMessage () ) )
385+ .toCompletionStage ( );
382386 }
383387
384388 @ Override
385389 public CompletionStage <Void > rollbackTransaction () {
386- return transaction .rollback ()
387- .onFailure ( v -> LOG .errorf ( "Failed to rollback transaction: %s" , transaction ) )
388- .onSuccess ( v -> LOG .tracef ( "Transaction rolled back: %s" , transaction ) )
389- .toCompletionStage ()
390- .whenComplete ( this ::clearTransaction );
390+ if ( isTransactionInProgress () ) {
391+ return connection .transaction ()
392+ .rollback ()
393+ .onFailure ( throwable -> LOG .errorf ( "Failed to rollback transaction: %s" , throwable .getMessage () ) )
394+ .onSuccess ( v -> LOG .tracef ( "Transaction rolled back: %s" , connection .transaction () ) )
395+ .toCompletionStage ();
396+ }
397+ LOG .trace ( "No transaction found to roll back" );
398+ return voidFuture ();
391399 }
392400
393401 @ Override
394402 public CompletionStage <Void > close () {
395- if ( transaction != null ) {
396- throw new IllegalStateException ( "Connection being closed with a live transaction associated to it" );
397- }
398- return connection .close ()
399- .onSuccess ( event -> LOG .tracef ( "Connection closed: %s" , connection ) )
400- .onFailure ( v -> LOG .errorf ( "Failed to close a connection: %s" , connection ) )
401- .toCompletionStage ();
402- }
403-
404- private void setTransaction (Transaction tx ) {
405- transaction = tx ;
403+ // We can probably skip the validation if the connection is already closed...but, you never know
404+ return validateLiveTransaction ( "closing the connection" )
405+ .handle ( CompletionStages ::handle )
406+ .thenCompose ( validationHandler -> supplyStage ( () -> closed
407+ ? voidFuture ().thenAccept ( v -> LOG .trace ( "Connection already closed" ) )
408+ : connection .close ().toCompletionStage () )
409+ .handle ( CompletionStages ::handle )
410+ .thenCompose ( closeConnectionHandler -> {
411+ if ( closeConnectionHandler .hasFailed () ) {
412+ if ( validationHandler .hasFailed () ) {
413+ // Error closing the connection, include the validation error
414+ closeConnectionHandler .getThrowable ()
415+ .addSuppressed ( validationHandler .getThrowable () );
416+ }
417+ return closeConnectionHandler .getResultAsCompletionStage ();
418+ }
419+ if ( !closed ) {
420+ closed = true ;
421+ LOG .tracef ( "Connection closed: %s" , connection );
422+ }
423+ else {
424+ LOG .tracef ( "Connection was already closed: %s" , connection );
425+ }
426+ // Connection closed, return the result of the validation
427+ return validationHandler .getResultAsCompletionStage ();
428+ } )
429+ );
406430 }
407431
408- private void clearTransaction (Void v , Throwable x ) {
409- LOG .tracef ( "Clearing current transaction instance from connection: %s" , transaction );
410- transaction = null ;
432+ /**
433+ * If there's a transaction open, roll back it and return a failed CompletionStage.
434+ * @param operation It will be included in the error message
435+ */
436+ private CompletionStage <Void > validateLiveTransaction (String operation ) {
437+ if ( isTransactionInProgress () ) {
438+ return supplyStage ( this ::rollbackTransaction )
439+ .handle ( CompletionStages ::handle )
440+ .thenCompose ( rollbackHandler -> {
441+ final Throwable validationError = LOG .liveTransactionDetected ( operation );
442+ if ( rollbackHandler .hasFailed () ) {
443+ // Include the error that happened during rollback
444+ validationError .addSuppressed ( rollbackHandler .getThrowable () );
445+ }
446+ return failedFuture ( validationError );
447+ } );
448+ }
449+ return voidFuture ();
411450 }
412451
413452 private static class RowSetResult implements Result {
@@ -460,5 +499,4 @@ private static void translateNulls(Object[] paramValues) {
460499 }
461500 }
462501 }
463-
464502}
0 commit comments