Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
* Emulate {@link org.hibernate.query.spi.AbstractSelectionQuery}.
* <p>
* Hibernate Reactive implementations already extend another class,
* they cannot extends {@link org.hibernate.query.spi.AbstractSelectionQuery too}.
* they cannot extend {@link org.hibernate.query.spi.AbstractSelectionQuery too}.
* This approach allows us to avoid duplicating code.
* </p>
* @param <R>
Expand All @@ -74,7 +74,7 @@ public class ReactiveAbstractSelectionQuery<R> {

private Set<String> fetchProfiles;

private final Runnable beforeQuery;
private final Supplier<CompletionStage<Void>> beforeQuery;

private final Consumer<Boolean> afterQuery;
private final Function<List<R>, R> uniqueElement;
Expand All @@ -93,7 +93,7 @@ public ReactiveAbstractSelectionQuery(
Supplier<DomainParameterXref> getDomainParameterXref,
Supplier<Class<?>> getResultType,
Supplier<String> getQueryString,
Runnable beforeQuery,
Supplier<CompletionStage<Void>> beforeQuery,
Consumer<Boolean> afterQuery,
Function<List<R>, R> uniqueElement) {
this(
Expand Down Expand Up @@ -121,7 +121,7 @@ public ReactiveAbstractSelectionQuery(
Supplier<DomainParameterXref> getDomainParameterXref,
Supplier<Class<?>> getResultType,
Supplier<String> getQueryString,
Runnable beforeQuery,
Supplier<CompletionStage<Void>> beforeQuery,
Consumer<Boolean> afterQuery,
Function<List<R>, R> uniqueElement,
InterpretationsKeySource interpretationsKeySource) {
Expand Down Expand Up @@ -200,8 +200,8 @@ private LockOptions getLockOptions() {

public CompletionStage<List<R>> reactiveList() {
final Set<String> profiles = applyProfiles();
beforeQuery.run();
return doReactiveList()
return beforeQuery.get()
.thenCompose( v -> doReactiveList() )
.handle( (list, error) -> {
handleException( error );
return list;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.hibernate.reactive.query.sql.spi.ReactiveNativeQueryImplementor;
import org.hibernate.reactive.query.sql.spi.ReactiveNonSelectQueryPlan;
import org.hibernate.reactive.query.sqm.spi.ReactiveSelectQueryPlan;
import org.hibernate.reactive.session.ReactiveSession;
import org.hibernate.sql.exec.spi.Callback;
import org.hibernate.type.BasicTypeReference;

Expand All @@ -52,8 +53,11 @@
import jakarta.persistence.LockModeType;
import jakarta.persistence.Parameter;
import jakarta.persistence.TemporalType;
import jakarta.persistence.metamodel.Type;
import jakarta.persistence.metamodel.SingularAttribute;
import jakarta.persistence.metamodel.Type;

import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;


public class ReactiveNativeQueryImpl<R> extends NativeQueryImpl<R>
implements ReactiveNativeQueryImplementor<R> {
Expand Down Expand Up @@ -123,13 +127,46 @@ private ReactiveAbstractSelectionQuery<R> createSelectionQueryDelegate(SharedSes
this::getNull,
this::getNull,
this::getQueryString,
this::beforeQuery,
this::reactiveBeforeQuery,
this::afterQuery,
AbstractSelectionQuery::uniqueElement,
null
);
}

protected CompletionStage<Void> reactiveBeforeQuery() {
getQueryParameterBindings().validate();

final var session = getSession();
session.prepareForQueryExecution( requiresTxn( getQueryOptions().getLockOptions().getLockMode() ) );
return reactivePrepareForExecution()
.thenAccept( v -> {
prepareSessionFlushMode( session );
prepareSessionCacheMode( session );
} );
}

protected CompletionStage<Void> reactivePrepareForExecution() {
final var spaces = getSynchronizedQuerySpaces();
if ( spaces == null || spaces.isEmpty() ) {
// We need to flush. The query itself is not required to execute in a
// transaction; if there is no transaction, the flush would throw a
// TransactionRequiredException which would potentially break existing
// apps, so we only do the flush if a transaction is in progress.
if ( shouldFlush() ) {
return ( (ReactiveSession) getSession() )
.reactiveFlush()
.thenAccept( v -> resetCallback() );
}
// Reset the callback before every execution
resetCallback();
}
// Otherwise, the application specified query spaces via the Hibernate
// SynchronizeableQuery and so the query will already perform a partial
// flush according to the defined query spaces - no need for a full flush.
return voidFuture();
}

private CompletionStage<List<R>> doReactiveList() {
return reactiveSelectPlan().reactivePerformList( this );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@
import jakarta.persistence.TemporalType;
import jakarta.persistence.metamodel.Type;

import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture;
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

/**
* A reactive {@link SqmQueryImpl}
*/
Expand Down Expand Up @@ -124,12 +127,22 @@ private ReactiveAbstractSelectionQuery<R> createSelectionQueryDelegate(SharedSes
this::getDomainParameterXref,
this::getResultType,
this::getQueryString,
this::beforeQuery,
this::reactiveBeforeQuery,
this::afterQuery,
AbstractSelectionQuery::uniqueElement
);
}

private CompletionStage<Void> reactiveBeforeQuery() {
try {
beforeQuery();
return voidFuture();
}
catch (Throwable e) {
return failedFuture( e );
}
}

@Override
public CompletionStage<R> reactiveUnique() {
return selectionQueryDelegate.reactiveUnique();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.util.stream.Stream;

import static org.hibernate.query.spi.SqlOmittingQueryOptions.omitSqlQueryOptions;
import static org.hibernate.reactive.util.impl.CompletionStages.failedFuture;
import static org.hibernate.reactive.util.impl.CompletionStages.voidFuture;

/**
* A reactive {@link SqmSelectionQueryImpl}
Expand Down Expand Up @@ -83,15 +85,24 @@ private ReactiveAbstractSelectionQuery<R> createSelectionQueryDelegate(SharedSes
this::getDomainParameterXref,
this::getResultType,
this::getQueryString,
this::beforeQuery,
this::reactiveBeforeQuery,
this::afterQuery,
AbstractSelectionQuery::uniqueElement
);
}

private CompletionStage<Void> reactiveBeforeQuery() {
try {
beforeQuery();
return voidFuture();
}
catch (Throwable e) {
return failedFuture( e );
}
}

private CompletionStage<List<R>> doReactiveList() {
getSession().prepareForQueryExecution( requiresTxn( getQueryOptions().getLockOptions()
.findGreatestLockMode() ) );
getSession().prepareForQueryExecution( requiresTxn( getQueryOptions().getLockOptions().findGreatestLockMode() ) );

final SqmSelectStatement<?> sqmStatement = getSqmStatement();
final boolean containsCollectionFetches = sqmStatement.containsCollectionFetches();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,11 @@ public class ReactiveStatelessSessionImpl extends StatelessSessionImpl implement
private final ReactiveConnection reactiveConnection;
private final ReactiveStatelessSessionImpl batchingHelperSession;
private final PersistenceContext persistenceContext;
private final boolean connectionProvided;

public ReactiveStatelessSessionImpl(SessionFactoryImpl factory, SessionCreationOptions options, ReactiveConnection connection) {
super( factory, options );
connectionProvided = options.getConnection() != null;
reactiveConnection = connection;
persistenceContext = new ReactivePersistenceContextAdapter( super.getPersistenceContext() );
batchingHelperSession = new ReactiveStatelessSessionImpl( factory, options, reactiveConnection, persistenceContext );
Expand All @@ -150,6 +152,7 @@ private ReactiveStatelessSessionImpl(
ReactiveConnection connection,
PersistenceContext persistenceContext) {
super( factory, options );
connectionProvided = options.getConnection() != null;
this.persistenceContext = persistenceContext;
// StatelessSession should not allow JDBC batching, because that would change
// its "immediate synchronous execution" model into something more like transactional
Expand Down Expand Up @@ -1019,6 +1022,12 @@ public void prepareForQueryExecution(boolean requiresTxn) {
// }
}

@Override
public boolean isTransactionInProgress() {
return connectionProvided || ( isOpenOrWaitingForAutoClose()
&& reactiveConnection.isTransactionInProgress() );
}

@Override
public <R> ReactiveSqmQueryImplementor<R> createReactiveQuery(String queryString, Class<R> expectedResultType) {
checkOpen();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.reactive;

import org.hibernate.FlushMode;

import org.junit.jupiter.api.Test;

import io.vertx.junit5.VertxTestContext;

import static org.assertj.core.api.Assertions.assertThat;

public class NoEntitiesTest extends BaseReactiveTest {

@Test
public void emptyMetamodelWithMutiny() {
assertThat( getMutinySessionFactory().getMetamodel().getEntities() ).isEmpty();
}

@Test
public void shouldBeAbleToRunQueryWithMutinyTransaction(VertxTestContext context) {
test( context, getMutinySessionFactory()
.withTransaction( s -> s
.createNativeQuery( "select 42", Long.class ).getSingleResult()
).invoke( result -> assertThat( result ).isEqualTo( 42L ) )
);
}

@Test
public void runNativeQueryWithMutinyTransactionAndFlush(VertxTestContext context) {
test( context, getMutinySessionFactory()
.withTransaction( s -> {
s.setFlushMode( FlushMode.ALWAYS );
return s
.createNativeQuery( "select 42", Long.class ).getSingleResult()
.call( s::flush );
} ).invoke( result -> assertThat( result ).isEqualTo( 42L ) )
);
}

@Test
public void runStatelessNativeQueryWithMutinyTransactionAndFlush(VertxTestContext context) {
test(
context, getMutinySessionFactory()
.withStatelessTransaction( s -> s
.createNativeQuery( "select 42", Long.class )
.getSingleResult()
)
.invoke( result -> assertThat( result ).isEqualTo( 42L ) )
);
}

@Test
public void shouldBeAbleToRunQueryWithMutinyWithoutTransaction(VertxTestContext context) {
test( context, getMutinySessionFactory()
.openSession().chain( s -> s
.createNativeQuery( "select 666", Long.class ).getSingleResult()
).invoke( result -> assertThat( result ).isEqualTo( 666L ) )
);
}
}
Loading