Skip to content

Commit 0bac067

Browse files
authored
Remove Future.Computation and streamline task execution/cancellation (#3107)
This change removes the internal `Computation` indirection in `FutureImpl` and replaces it with a simplified, clearer execution model based directly on `Task`. Internalizing various execution modes inside `FutureImpl` enabled better cancellation handling in cases when futures were cancelled before running.
1 parent 85f2700 commit 0bac067

File tree

3 files changed

+105
-63
lines changed

3 files changed

+105
-63
lines changed

vavr/src/main/java/io/vavr/concurrent/FutureImpl.java

Lines changed: 29 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,13 @@
2222
import io.vavr.control.Option;
2323
import io.vavr.control.Try;
2424
import java.util.Objects;
25-
import java.util.concurrent.*;
25+
import java.util.concurrent.CancellationException;
26+
import java.util.concurrent.ExecutionException;
27+
import java.util.concurrent.Executor;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.ForkJoinPool;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.TimeoutException;
2632
import java.util.concurrent.locks.Lock;
2733
import java.util.concurrent.locks.LockSupport;
2834
import java.util.concurrent.locks.ReentrantLock;
@@ -32,7 +38,7 @@
3238
* <strong>INTERNAL API - This class is subject to change.</strong>
3339
*
3440
* @param <T> Result of the computation.
35-
* @author Daniel Dietrich
41+
* @author Daniel Dietrich, Grzegorz Piwowarek
3642
*/
3743
@SuppressWarnings("deprecation")
3844
final class FutureImpl<T> implements Future<T> {
@@ -84,9 +90,8 @@ final class FutureImpl<T> implements Future<T> {
8490
*/
8591
private Thread thread;
8692

87-
8893
// single constructor
89-
private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T>>> actions, Queue<Thread> waiters, Computation<T> computation) {
94+
private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T>>> actions, Queue<Thread> waiters, Task<? extends T> task) {
9095
this.lock = new ReentrantLock();
9196
this.executor = executor;
9297
lock.lock();
@@ -96,7 +101,22 @@ private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T
96101
this.actions = actions;
97102
this.waiters = waiters;
98103
try {
99-
computation.execute(this::tryComplete, this::updateThread);
104+
if (task == null) {
105+
// no need to do anything
106+
} else if (task instanceof Task.SyncTask) {
107+
task.run(this::tryComplete);
108+
} else {
109+
executor.execute(() -> {
110+
if (!isCancelled()) {
111+
updateThread();
112+
try {
113+
task.run(this::tryComplete);
114+
} catch (Throwable x) {
115+
tryComplete(Try.failure(x));
116+
}
117+
}
118+
});
119+
}
100120
} catch (Throwable x) {
101121
tryComplete(Try.failure(x));
102122
}
@@ -113,7 +133,7 @@ private FutureImpl(Executor executor, Option<Try<T>> value, Queue<Consumer<Try<T
113133
* @return a new {@code FutureImpl} instance
114134
*/
115135
static <T> FutureImpl<T> of(Executor executor) {
116-
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) -> {});
136+
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), null);
117137
}
118138

119139
/**
@@ -125,7 +145,7 @@ static <T> FutureImpl<T> of(Executor executor) {
125145
* @return a new {@code FutureImpl} instance
126146
*/
127147
static <T> FutureImpl<T> of(Executor executor, Try<? extends T> value) {
128-
return new FutureImpl<>(executor, Option.some(Try.narrow(value)), null, null, (complete, updateThread) -> {});
148+
return new FutureImpl<>(executor, Option.some(Try.narrow(value)), null, null, null);
129149
}
130150

131151
/**
@@ -138,9 +158,7 @@ static <T> FutureImpl<T> of(Executor executor, Try<? extends T> value) {
138158
* @return a new {@code FutureImpl} instance
139159
*/
140160
static <T> FutureImpl<T> sync(Executor executor, Task<? extends T> task) {
141-
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) ->
142-
task.run(complete::with)
143-
);
161+
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (Task.SyncTask<T>) complete -> task.run(complete::with));
144162
}
145163

146164
/**
@@ -154,16 +172,7 @@ static <T> FutureImpl<T> sync(Executor executor, Task<? extends T> task) {
154172
*/
155173
static <T> FutureImpl<T> async(Executor executor, Task<? extends T> task) {
156174
// In a single-threaded context this Future may already have been completed during initialization.
157-
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), (complete, updateThread) ->
158-
executor.execute(() -> {
159-
updateThread.run();
160-
try {
161-
task.run(complete::with);
162-
} catch (Throwable x) {
163-
complete.with(Try.failure(x));
164-
}
165-
})
166-
);
175+
return new FutureImpl<>(executor, Option.none(), Queue.empty(), Queue.empty(), task);
167176
}
168177

169178
@Override
@@ -425,8 +434,4 @@ private void unlock(Thread waiter) {
425434
private void handleUncaughtException(Throwable x) {
426435
tryComplete(Try.failure(x));
427436
}
428-
429-
private interface Computation<T> {
430-
void execute(Task.Complete<T> complete, Runnable updateThread) throws Throwable;
431-
}
432437
}

vavr/src/main/java/io/vavr/concurrent/Task.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,10 @@ public interface Task<T> {
2929
*/
3030
void run(Complete<T> complete) throws Throwable;
3131

32+
@FunctionalInterface
33+
interface SyncTask<T> extends Task<T> {
34+
}
35+
3236
/**
3337
* Completes a task.
3438
*

vavr/src/test/java/io/vavr/concurrent/FutureTest.java

Lines changed: 72 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.concurrent.ExecutionException;
4141
import java.util.concurrent.Executor;
4242
import java.util.concurrent.ExecutorService;
43+
import java.util.concurrent.Executors;
4344
import java.util.concurrent.ForkJoinPool;
4445
import java.util.concurrent.RejectedExecutionException;
4546
import java.util.concurrent.SynchronousQueue;
@@ -50,6 +51,7 @@
5051
import java.util.concurrent.atomic.AtomicReference;
5152
import java.util.function.Function;
5253
import org.assertj.core.api.IterableAssert;
54+
import org.junit.jupiter.api.Nested;
5355
import org.junit.jupiter.api.Test;
5456
import org.junit.jupiter.api.extension.AfterEachCallback;
5557
import org.junit.jupiter.api.extension.BeforeEachCallback;
@@ -59,6 +61,7 @@
5961
import static io.vavr.concurrent.Concurrent.waitUntil;
6062
import static io.vavr.concurrent.Concurrent.zZz;
6163
import static java.util.concurrent.TimeUnit.MILLISECONDS;
64+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
6265
import static org.junit.jupiter.api.Assertions.assertThrows;
6366

6467
@SuppressWarnings("deprecation")
@@ -112,7 +115,8 @@ protected <T> IterableAssert<T> assertThat(Iterable<T> actual) {
112115
@Override
113116
public IterableAssert<T> isEqualTo(Object expected) {
114117
if (actual instanceof Future && expected instanceof Future) {
115-
FutureTest.super.assertThat(((Future<T>) actual).getValue()).isEqualTo(((Future<T>) expected).getValue());
118+
FutureTest.super.assertThat(((Future<T>) actual).getValue())
119+
.isEqualTo(((Future<T>) expected).getValue());
116120
return this;
117121
} else {
118122
return super.isEqualTo(expected);
@@ -487,7 +491,7 @@ public void shouldReduceSequenceOfFutures() {
487491
public void shouldReduceWithErrorIfSequenceOfFuturesContainsOneError() {
488492
final Future<Integer> future = Future.reduce(
489493
List.of(Future.of(zZz(13)), Future.of(zZz(new Error()))),
490-
(i1, i2) -> i1 + i2
494+
Integer::sum
491495
).await();
492496
assertFailed(future, Error.class);
493497
}
@@ -692,46 +696,29 @@ public void shouldFallbackToThisFailure() {
692696
@Test
693697
public void shouldFoldEmptyIterable() {
694698
final Seq<Future<Integer>> futures = Stream.empty();
695-
final Future<Integer> testee = Future.fold(futures, 0, (a, b) -> a + b).await();
699+
final Future<Integer> testee = Future.fold(futures, 0, Integer::sum).await();
696700
assertThat(testee.getValue().get()).isEqualTo(Try.success(0));
697701
}
698702

699703
@Test
700704
public void shouldFoldNonEmptyIterableOfSucceedingFutures() {
701705
final Seq<Future<Integer>> futures = Stream.from(1).map(i -> Future.of(zZz(i))).take(5);
702-
final Future<Integer> testee = Future.fold(futures, 0, (a, b) -> a + b).await();
706+
final Future<Integer> testee = Future.fold(futures, 0, Integer::sum).await();
703707
assertThat(testee.getValue().get()).isEqualTo(Try.success(15));
704708
}
705709

706710
@Test
707711
public void shouldFoldNonEmptyIterableOfFailingFutures() {
708712
final Seq<Future<Integer>> futures = Stream.from(1).map(i -> Future.<Integer>of(zZz(new Error()))).take(5);
709-
final Future<Integer> testee = Future.fold(futures, 0, (a, b) -> a + b).await();
713+
final Future<Integer> testee = Future.fold(futures, 0, Integer::sum).await();
710714
assertFailed(testee, Error.class);
711715
}
712716

713-
// -- cancel()
714-
715-
@Test
716-
public void shouldInterruptLockedFuture() {
717-
final Object monitor = new Object();
718-
final AtomicBoolean running = new AtomicBoolean(false);
719-
final Future<?> future = blocking(() -> {
720-
synchronized (monitor) {
721-
running.set(true);
722-
monitor.wait(); // wait forever
723-
}
724-
});
725-
waitUntil(running::get);
726-
synchronized (monitor) {
727-
future.cancel();
728-
}
729-
assertThat(future.isCancelled()).isTrue();
730-
}
731-
732-
@Test
733-
public void shouldThrowOnGetAfterCancellation() {
734-
assertThrows(CancellationException.class, () -> {
717+
@Nested
718+
public class CancellationTests {
719+
720+
@Test
721+
public void shouldInterruptLockedFuture() {
735722
final Object monitor = new Object();
736723
final AtomicBoolean running = new AtomicBoolean(false);
737724
final Future<?> future = blocking(() -> {
@@ -745,21 +732,67 @@ public void shouldThrowOnGetAfterCancellation() {
745732
future.cancel();
746733
}
747734
assertThat(future.isCancelled()).isTrue();
748-
future.get();
749-
});
750-
}
735+
}
751736

752-
@Test
753-
public void shouldCancelFutureThatNeverCompletes() {
754-
@SuppressWarnings("deprecation") final Future<?> future = Future.run(complete -> {
755-
// we break our promise, the Future is never completed
756-
});
737+
@Test
738+
public void shouldThrowOnGetAfterCancellation() {
739+
assertThrows(CancellationException.class, () -> {
740+
final Object monitor = new Object();
741+
final AtomicBoolean running = new AtomicBoolean(false);
742+
final Future<?> future = blocking(() -> {
743+
synchronized (monitor) {
744+
running.set(true);
745+
monitor.wait(); // wait forever
746+
}
747+
});
748+
waitUntil(running::get);
749+
synchronized (monitor) {
750+
future.cancel();
751+
}
752+
assertThat(future.isCancelled()).isTrue();
753+
future.get();
754+
});
755+
}
757756

758-
assertThat(future.isCompleted()).isFalse();
759-
assertThat(future.isCancelled()).isFalse();
757+
@Test
758+
public void shouldCancelFutureThatNeverCompletes() {
759+
@SuppressWarnings("deprecation") final Future<?> future = Future.run(complete -> {
760+
// we break our promise, the Future is never completed
761+
});
760762

761-
assertThat(future.cancel()).isTrue();
762-
assertThat(future.isCompleted()).isTrue();
763+
assertThat(future.isCompleted()).isFalse();
764+
assertThat(future.isCancelled()).isFalse();
765+
766+
assertThat(future.cancel()).isTrue();
767+
assertThat(future.isCompleted()).isTrue();
768+
}
769+
770+
@Test
771+
void shouldNotRunCancelledFuture() {
772+
ExecutorService es = Executors.newSingleThreadExecutor();
773+
774+
AtomicBoolean f1Executed = new AtomicBoolean(false);
775+
AtomicBoolean f2Executed = new AtomicBoolean(false);
776+
777+
Future<Void> f1 = Future.run(es, () -> {Thread.sleep(10000);f1Executed.set(true);});
778+
Future<Void> f2 = Future.run(es, () -> {f2Executed.set(true);Thread.sleep(10000);});
779+
780+
f2.cancel(true);
781+
f1.cancel(true);
782+
783+
f1.await();
784+
f2.await();
785+
786+
assertThat(f1.isCancelled()).isTrue();
787+
assertThat(f2.isCancelled()).isTrue();
788+
789+
assertThat(f1.isFailure()).isTrue();
790+
assertThat(f2.isFailure()).isTrue();
791+
assertThatThrownBy(f1::get).isInstanceOf(CancellationException.class);
792+
assertThatThrownBy(f2::get).isInstanceOf(CancellationException.class);
793+
assertThat(f1Executed.get()).isFalse();
794+
assertThat(f2Executed.get()).isFalse();
795+
}
763796
}
764797

765798
// -- collect()

0 commit comments

Comments
 (0)