1515
1616package software .amazon .awssdk .transfer .s3 .internal ;
1717
18- import java .util .Optional ;
1918import java .util .concurrent .CompletableFuture ;
20- import java .util .concurrent .atomic .AtomicBoolean ;
2119import java .util .concurrent .atomic .AtomicInteger ;
2220import java .util .function .Function ;
2321import org .reactivestreams .Subscriber ;
2422import org .reactivestreams .Subscription ;
2523import software .amazon .awssdk .annotations .SdkInternalApi ;
2624import software .amazon .awssdk .utils .Logger ;
2725import software .amazon .awssdk .utils .Validate ;
28- import software .amazon .awssdk .utils .async .DemandIgnoringSubscription ;
29- import software .amazon .awssdk .utils .async .StoringSubscriber ;
3026
3127/**
3228 * An implementation of {@link Subscriber} that execute the provided function for every event and limits the number of concurrent
@@ -41,20 +37,16 @@ public class AsyncBufferingSubscriber<T> implements Subscriber<T> {
4137 private final Function <T , CompletableFuture <?>> consumer ;
4238 private final int maxConcurrentExecutions ;
4339 private final AtomicInteger numRequestsInFlight ;
44- private final AtomicBoolean isDelivering = new AtomicBoolean (false );
45- private volatile boolean isStreamingDone ;
40+ private volatile boolean upstreamDone ;
4641 private Subscription subscription ;
4742
48- private final StoringSubscriber <T > storingSubscriber ;
49-
5043 public AsyncBufferingSubscriber (Function <T , CompletableFuture <?>> consumer ,
5144 CompletableFuture <Void > returnFuture ,
5245 int maxConcurrentExecutions ) {
5346 this .returnFuture = returnFuture ;
5447 this .consumer = consumer ;
5548 this .maxConcurrentExecutions = maxConcurrentExecutions ;
5649 this .numRequestsInFlight = new AtomicInteger (0 );
57- this .storingSubscriber = new StoringSubscriber <>(Integer .MAX_VALUE );
5850 }
5951
6052 @ Override
@@ -65,89 +57,41 @@ public void onSubscribe(Subscription subscription) {
6557 subscription .cancel ();
6658 return ;
6759 }
68- storingSubscriber .onSubscribe (new DemandIgnoringSubscription (subscription ));
6960 this .subscription = subscription ;
7061 subscription .request (maxConcurrentExecutions );
7162 }
7263
7364 @ Override
7465 public void onNext (T item ) {
75- storingSubscriber .onNext (item );
76- flushBufferIfNeeded ();
77- }
78-
79- private void flushBufferIfNeeded () {
80- if (isDelivering .compareAndSet (false , true )) {
81- try {
82- Optional <StoringSubscriber .Event <T >> next = storingSubscriber .peek ();
83- while (numRequestsInFlight .get () < maxConcurrentExecutions ) {
84- if (!next .isPresent ()) {
85- subscription .request (1 );
86- break ;
87- }
88-
89- switch (next .get ().type ()) {
90- case ON_COMPLETE :
91- handleCompleteEvent ();
92- break ;
93- case ON_ERROR :
94- handleError (next .get ().runtimeError ());
95- break ;
96- case ON_NEXT :
97- handleOnNext (next .get ().value ());
98- break ;
99- default :
100- handleError (new IllegalStateException ("Unknown stored type: " + next .get ().type ()));
101- break ;
102- }
103-
104- next = storingSubscriber .peek ();
105- }
106- } finally {
107- isDelivering .set (false );
108- }
109- }
110- }
111-
112- private void handleOnNext (T item ) {
113- storingSubscriber .poll ();
114-
115- int numberOfRequestInFlight = numRequestsInFlight .incrementAndGet ();
116- log .debug (() -> "Delivering next item, numRequestInFlight=" + numberOfRequestInFlight );
117-
66+ numRequestsInFlight .incrementAndGet ();
11867 consumer .apply (item ).whenComplete ((r , t ) -> {
119- numRequestsInFlight .decrementAndGet ();
120- if (! isStreamingDone ) {
68+ checkForCompletion ( numRequestsInFlight .decrementAndGet () );
69+ synchronized ( this ) {
12170 subscription .request (1 );
122- } else {
123- flushBufferIfNeeded ();
12471 }
12572 });
12673 }
12774
128- private void handleCompleteEvent () {
129- if (numRequestsInFlight .get () == 0 ) {
130- returnFuture .complete (null );
131- storingSubscriber .poll ();
132- }
133- }
134-
13575 @ Override
13676 public void onError (Throwable t ) {
137- handleError (t );
138- storingSubscriber .onError (t );
139- }
140-
141- private void handleError (Throwable t ) {
77+ // Need to complete future exceptionally first to prevent
78+ // accidental successful completion by a concurrent checkForCompletion.
14279 returnFuture .completeExceptionally (t );
143- storingSubscriber . poll () ;
80+ upstreamDone = true ;
14481 }
14582
14683 @ Override
14784 public void onComplete () {
148- isStreamingDone = true ;
149- storingSubscriber .onComplete ();
150- flushBufferIfNeeded ();
85+ upstreamDone = true ;
86+ checkForCompletion (numRequestsInFlight .get ());
87+ }
88+
89+ private void checkForCompletion (int requestsInFlight ) {
90+ if (upstreamDone && requestsInFlight == 0 ) {
91+ // This could get invoked multiple times, but it doesn't matter
92+ // because future.complete is idempotent.
93+ returnFuture .complete (null );
94+ }
15195 }
15296
15397 /**
0 commit comments