Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,12 @@ public interface Log extends BasicLogger {
@Message(id = 89, value = "Connection is closed")
IllegalStateException connectionIsClosed();

@Message(id = 90, value = "Live transaction detected while closing the connection: it will be roll backed")
IllegalStateException liveTransactionDetectedOnClose();

@Message(id = 91, value = "Can't begin a new transaction as an active transaction is already associated to this connection")
IllegalStateException liveTransactionDetectedOnBeginTransaction();

// Same method that exists in CoreMessageLogger
@LogMessage(level = WARN)
@Message(id = 104, value = "firstResult/maxResults specified with collection fetch; applying in memory!" )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ public Mutiny.Transaction currentTransaction() {
}

private class Transaction<T> implements Mutiny.Transaction {
boolean rollback;
boolean markedForRollback;

Uni<T> execute(Function<Mutiny.Transaction, Uni<T>> work) {
currentTransaction = this;
Expand All @@ -504,13 +504,16 @@ Uni<T> executeInTransaction(Function<Mutiny.Transaction, Uni<T>> work) {
return Uni.createFrom()
.deferred( () -> work.apply( this ) )
// only flush() if the work completed with no exception
.call( this::flush ).call( this::beforeCompletion )
.call( this::flush )
.call( this::beforeCompletion )
// in the case of an exception or cancellation
// we need to roll back the transaction
.onFailure().call( this::rollback ).onCancellation().call( this::rollback )
.onFailure().call( this::rollback )
.onCancellation().call( this::rollback )
// finally, when there was no exception,
// commit or rollback the transaction
.call( () -> rollback ? rollback() : commit() ).call( this::afterCompletion );
.call( () -> markedForRollback ? rollback() : commit() )
.call( this::afterCompletion );
}

Uni<Void> flush() {
Expand All @@ -534,17 +537,17 @@ private Uni<Void> beforeCompletion() {
}

private Uni<Void> afterCompletion() {
return Uni.createFrom().completionStage( delegate.getReactiveActionQueue().afterTransactionCompletion( !rollback ) );
return Uni.createFrom().completionStage( delegate.getReactiveActionQueue().afterTransactionCompletion( !markedForRollback ) );
}

@Override
public void markForRollback() {
rollback = true;
markedForRollback = true;
}

@Override
public boolean isMarkedForRollback() {
return rollback;
return markedForRollback;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@
import io.vertx.sqlclient.RowSet;
import io.vertx.sqlclient.SqlConnection;
import io.vertx.sqlclient.SqlResult;
import io.vertx.sqlclient.Transaction;
import io.vertx.sqlclient.Tuple;
import io.vertx.sqlclient.spi.DatabaseMetadata;

import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture;
import static org.hibernate.reactive.util.impl.CompletionStages.rethrow;
import static org.hibernate.reactive.util.impl.CompletionStages.supplyStage;
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

/**
Expand All @@ -62,7 +63,10 @@ public class SqlClientConnection implements ReactiveConnection {
private final SqlConnection connection;
// The context associated to the connection. We expect the connection to be executed in this context.
private final ContextInternal connectionContext;
private Transaction transaction;

// The close operation could be called multiple times if an error occurs,
// if we execute it every time, we will have several useless messages in the log
private boolean closed = false;

SqlClientConnection(
SqlConnection connection,
Expand Down Expand Up @@ -362,52 +366,88 @@ private SqlConnection client() {

@Override
public CompletionStage<Void> beginTransaction() {
if ( transaction != null ) {
throw new IllegalStateException( "Can't begin a new transaction as an active transaction is already associated to this connection" );
if ( isTransactionInProgress() ) {
return failedFuture( LOG.liveTransactionDetectedOnBeginTransaction() );
}
return connection.begin()
.onSuccess( tx -> LOG.tracef( "Transaction started: %s", tx ) )
.onFailure( v -> LOG.errorf( "Failed to start a transaction: %s", transaction ) )
.onFailure( throwable -> LOG.errorf( "Failed to start a transaction: %s", throwable.getMessage() ) )
.toCompletionStage()
.thenAccept( this::setTransaction );
.thenCompose( CompletionStages::voidFuture );
}

@Override
public CompletionStage<Void> commitTransaction() {
return transaction.commit()
.onSuccess( v -> LOG.tracef( "Transaction committed: %s", transaction ) )
.onFailure( v -> LOG.errorf( "Failed to commit transaction: %s", transaction ) )
.toCompletionStage()
.whenComplete( this::clearTransaction );
return connection.transaction()
.commit()
.onSuccess( v -> LOG.tracef( "Transaction committed: %s", connection.transaction() ) )
.onFailure( throwable -> LOG.errorf( "Failed to commit transaction: %s", throwable.getMessage() ) )
.toCompletionStage();
}

@Override
public CompletionStage<Void> rollbackTransaction() {
return transaction.rollback()
.onFailure( v -> LOG.errorf( "Failed to rollback transaction: %s", transaction ) )
.onSuccess( v -> LOG.tracef( "Transaction rolled back: %s", transaction ) )
.toCompletionStage()
.whenComplete( this::clearTransaction );
if ( isTransactionInProgress() ) {
return connection.transaction()
.rollback()
.onFailure( throwable -> LOG.errorf( "Failed to rollback transaction: %s", throwable.getMessage() ) )
.onSuccess( v -> LOG.tracef( "Transaction rolled back: %s", connection.transaction() ) )
.toCompletionStage();
}
LOG.trace( "No transaction found to roll back" );
return voidFuture();
}

@Override
public CompletionStage<Void> close() {
if ( transaction != null ) {
throw new IllegalStateException( "Connection being closed with a live transaction associated to it" );
}
return connection.close()
.onSuccess( event -> LOG.tracef( "Connection closed: %s", connection ) )
.onFailure( v -> LOG.errorf( "Failed to close a connection: %s", connection ) )
.toCompletionStage();
}

private void setTransaction(Transaction tx) {
transaction = tx;
// We can probably skip the validation if the connection is already closed...but, you never know
return validateNoTransactionInProgressOnClose()
.handle( CompletionStages::handle )
.thenCompose( validationHandler -> supplyStage( () -> closed
? voidFuture().thenAccept( v -> LOG.trace( "Connection already closed" ) )
: connection.close().toCompletionStage() )
.handle( CompletionStages::handle )
.thenCompose( closeConnectionHandler -> {
if ( closeConnectionHandler.hasFailed() ) {
if ( validationHandler.hasFailed() ) {
// Error closing the connection, include the validation error
closeConnectionHandler.getThrowable()
.addSuppressed( validationHandler.getThrowable() );
}
// Return a failed CompletionStage
return closeConnectionHandler.getResultAsCompletionStage();
}
if ( !closed ) {
closed = true;
LOG.tracef( "Connection closed: %s", connection );
}
else {
LOG.tracef( "Connection was already closed: %s", connection );
}
// Connection closed, return the result of the validation
return validationHandler.getResultAsCompletionStage();
} )
);
}

private void clearTransaction(Void v, Throwable x) {
LOG.tracef( "Clearing current transaction instance from connection: %s", transaction );
transaction = null;
/**
* If there's a transaction open, roll back it and return a failed CompletionStage.
* The validation error is related to closing the connection.
*/
private CompletionStage<Void> validateNoTransactionInProgressOnClose() {
if ( isTransactionInProgress() ) {
return supplyStage( this::rollbackTransaction )
.handle( CompletionStages::handle )
.thenCompose( rollbackHandler -> {
final Throwable validationError = LOG.liveTransactionDetectedOnClose();
if ( rollbackHandler.hasFailed() ) {
// Include the error that happened during rollback
validationError.addSuppressed( rollbackHandler.getThrowable() );
}
return failedFuture( validationError );
} );
}
return voidFuture();
}

private static class RowSetResult implements Result {
Expand Down Expand Up @@ -460,5 +500,4 @@ private static void translateNulls(Object[] paramValues) {
}
}
}

}
Loading