@@ -618,24 +618,23 @@ private static <T> DispatchResult<T> emptyDispatchResult() {
618618 return (DispatchResult <T >) EMPTY_DISPATCH_RESULT ;
619619 }
620620
621- private class DataLoaderSubscriber implements Subscriber <V > {
621+ private abstract class DataLoaderSubscriberBase < T > implements Subscriber <T > {
622622
623- private final CompletableFuture <List <V >> valuesFuture ;
624- private final List <K > keys ;
625- private final List <Object > callContexts ;
626- private final List <CompletableFuture <V >> queuedFutures ;
623+ final CompletableFuture <List <V >> valuesFuture ;
624+ final List <K > keys ;
625+ final List <Object > callContexts ;
626+ final List <CompletableFuture <V >> queuedFutures ;
627627
628- private final List <K > clearCacheKeys = new ArrayList <>();
629- private final List <V > completedValues = new ArrayList <>();
630- private int idx = 0 ;
631- private boolean onErrorCalled = false ;
632- private boolean onCompleteCalled = false ;
628+ List <K > clearCacheKeys = new ArrayList <>();
629+ List <V > completedValues = new ArrayList <>();
630+ boolean onErrorCalled = false ;
631+ boolean onCompleteCalled = false ;
633632
634- private DataLoaderSubscriber (
635- CompletableFuture <List <V >> valuesFuture ,
636- List <K > keys ,
637- List <Object > callContexts ,
638- List <CompletableFuture <V >> queuedFutures
633+ DataLoaderSubscriberBase (
634+ CompletableFuture <List <V >> valuesFuture ,
635+ List <K > keys ,
636+ List <Object > callContexts ,
637+ List <CompletableFuture <V >> queuedFutures
639638 ) {
640639 this .valuesFuture = valuesFuture ;
641640 this .keys = keys ;
@@ -648,40 +647,87 @@ public void onSubscribe(Subscription subscription) {
648647 subscription .request (keys .size ());
649648 }
650649
651- // onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee
652- // correctness (at the cost of speed).
653650 @ Override
654- public synchronized void onNext (V value ) {
651+ public void onNext (T v ) {
655652 assertState (!onErrorCalled , () -> "onError has already been called; onNext may not be invoked." );
656653 assertState (!onCompleteCalled , () -> "onComplete has already been called; onNext may not be invoked." );
654+ }
657655
658- K key = keys .get (idx );
659- Object callContext = callContexts .get (idx );
660- CompletableFuture <V > future = queuedFutures .get (idx );
656+ @ Override
657+ public void onComplete () {
658+ assertState (!onErrorCalled , () -> "onError has already been called; onComplete may not be invoked." );
659+ onCompleteCalled = true ;
660+ }
661+
662+ @ Override
663+ public void onError (Throwable throwable ) {
664+ assertState (!onCompleteCalled , () -> "onComplete has already been called; onError may not be invoked." );
665+ onErrorCalled = true ;
666+
667+ stats .incrementBatchLoadExceptionCount (new IncrementBatchLoadExceptionCountStatisticsContext <>(keys , callContexts ));
668+ }
669+
670+ /*
671+ * A value has arrived - how do we complete the future that's associated with it in a common way
672+ */
673+ void onNextValue (K key , V value , Object callContext , CompletableFuture <V > future ) {
661674 if (value instanceof Try ) {
662675 // we allow the batch loader to return a Try so we can better represent a computation
663676 // that might have worked or not.
677+ //noinspection unchecked
664678 Try <V > tryValue = (Try <V >) value ;
665679 if (tryValue .isSuccess ()) {
666680 future .complete (tryValue .get ());
667681 } else {
668682 stats .incrementLoadErrorCount (new IncrementLoadErrorCountStatisticsContext <>(key , callContext ));
669683 future .completeExceptionally (tryValue .getThrowable ());
670- clearCacheKeys .add (keys . get ( idx ) );
684+ clearCacheKeys .add (key );
671685 }
672686 } else {
673687 future .complete (value );
674688 }
689+ }
690+
691+ Throwable unwrapThrowable (Throwable ex ) {
692+ if (ex instanceof CompletionException ) {
693+ ex = ex .getCause ();
694+ }
695+ return ex ;
696+ }
697+ }
698+
699+ private class DataLoaderSubscriber extends DataLoaderSubscriberBase <V > {
700+
701+ private int idx = 0 ;
702+
703+ private DataLoaderSubscriber (
704+ CompletableFuture <List <V >> valuesFuture ,
705+ List <K > keys ,
706+ List <Object > callContexts ,
707+ List <CompletableFuture <V >> queuedFutures
708+ ) {
709+ super (valuesFuture , keys , callContexts , queuedFutures );
710+ }
711+
712+ // onNext may be called by multiple threads - for the time being, we pass 'synchronized' to guarantee
713+ // correctness (at the cost of speed).
714+ @ Override
715+ public synchronized void onNext (V value ) {
716+ super .onNext (value );
717+
718+ K key = keys .get (idx );
719+ Object callContext = callContexts .get (idx );
720+ CompletableFuture <V > future = queuedFutures .get (idx );
721+ onNextValue (key , value , callContext , future );
675722
676723 completedValues .add (value );
677724 idx ++;
678725 }
679726
727+
680728 @ Override
681729 public void onComplete () {
682- assertState (!onErrorCalled , () -> "onError has already been called; onComplete may not be invoked." );
683- onCompleteCalled = true ;
684-
730+ super .onComplete ();
685731 assertResultSize (keys , completedValues );
686732
687733 possiblyClearCacheEntriesOnExceptions (clearCacheKeys );
@@ -690,13 +736,8 @@ public void onComplete() {
690736
691737 @ Override
692738 public void onError (Throwable ex ) {
693- assertState (!onCompleteCalled , () -> "onComplete has already been called; onError may not be invoked." );
694- onErrorCalled = true ;
695-
696- stats .incrementBatchLoadExceptionCount (new IncrementBatchLoadExceptionCountStatisticsContext <>(keys , callContexts ));
697- if (ex instanceof CompletionException ) {
698- ex = ex .getCause ();
699- }
739+ super .onError (ex );
740+ ex = unwrapThrowable (ex );
700741 // Set the remaining keys to the exception.
701742 for (int i = idx ; i < queuedFutures .size (); i ++) {
702743 K key = keys .get (i );
@@ -706,32 +747,23 @@ public void onError(Throwable ex) {
706747 dataLoader .clear (key );
707748 }
708749 }
750+
709751 }
710752
711- private class DataLoaderMapEntrySubscriber implements Subscriber <Map .Entry <K , V >> {
712- private final CompletableFuture <List <V >> valuesFuture ;
713- private final List <K > keys ;
714- private final List <Object > callContexts ;
715- private final List <CompletableFuture <V >> queuedFutures ;
753+ private class DataLoaderMapEntrySubscriber extends DataLoaderSubscriberBase <Map .Entry <K , V >> {
754+
716755 private final Map <K , Object > callContextByKey ;
717756 private final Map <K , CompletableFuture <V >> queuedFutureByKey ;
718-
719- private final List <K > clearCacheKeys = new ArrayList <>();
720757 private final Map <K , V > completedValuesByKey = new HashMap <>();
721- private boolean onErrorCalled = false ;
722- private boolean onCompleteCalled = false ;
758+
723759
724760 private DataLoaderMapEntrySubscriber (
725- CompletableFuture <List <V >> valuesFuture ,
726- List <K > keys ,
727- List <Object > callContexts ,
728- List <CompletableFuture <V >> queuedFutures
761+ CompletableFuture <List <V >> valuesFuture ,
762+ List <K > keys ,
763+ List <Object > callContexts ,
764+ List <CompletableFuture <V >> queuedFutures
729765 ) {
730- this .valuesFuture = valuesFuture ;
731- this .keys = keys ;
732- this .callContexts = callContexts ;
733- this .queuedFutures = queuedFutures ;
734-
766+ super (valuesFuture ,keys ,callContexts ,queuedFutures );
735767 this .callContextByKey = new HashMap <>();
736768 this .queuedFutureByKey = new HashMap <>();
737769 for (int idx = 0 ; idx < queuedFutures .size (); idx ++) {
@@ -743,42 +775,24 @@ private DataLoaderMapEntrySubscriber(
743775 }
744776 }
745777
746- @ Override
747- public void onSubscribe (Subscription subscription ) {
748- subscription .request (keys .size ());
749- }
750778
751779 @ Override
752780 public void onNext (Map .Entry <K , V > entry ) {
753- assertState (!onErrorCalled , () -> "onError has already been called; onNext may not be invoked." );
754- assertState (!onCompleteCalled , () -> "onComplete has already been called; onNext may not be invoked." );
781+ super .onNext (entry );
755782 K key = entry .getKey ();
756783 V value = entry .getValue ();
757784
758785 Object callContext = callContextByKey .get (key );
759786 CompletableFuture <V > future = queuedFutureByKey .get (key );
760- if (value instanceof Try ) {
761- // we allow the batch loader to return a Try so we can better represent a computation
762- // that might have worked or not.
763- Try <V > tryValue = (Try <V >) value ;
764- if (tryValue .isSuccess ()) {
765- future .complete (tryValue .get ());
766- } else {
767- stats .incrementLoadErrorCount (new IncrementLoadErrorCountStatisticsContext <>(key , callContext ));
768- future .completeExceptionally (tryValue .getThrowable ());
769- clearCacheKeys .add (key );
770- }
771- } else {
772- future .complete (value );
773- }
787+
788+ onNextValue (key , value , callContext , future );
774789
775790 completedValuesByKey .put (key , value );
776791 }
777792
778793 @ Override
779794 public void onComplete () {
780- assertState (!onErrorCalled , () -> "onError has already been called; onComplete may not be invoked." );
781- onCompleteCalled = true ;
795+ super .onComplete ();
782796
783797 possiblyClearCacheEntriesOnExceptions (clearCacheKeys );
784798 List <V > values = new ArrayList <>(keys .size ());
@@ -791,13 +805,8 @@ public void onComplete() {
791805
792806 @ Override
793807 public void onError (Throwable ex ) {
794- assertState (!onCompleteCalled , () -> "onComplete has already been called; onError may not be invoked." );
795- onErrorCalled = true ;
796-
797- stats .incrementBatchLoadExceptionCount (new IncrementBatchLoadExceptionCountStatisticsContext <>(keys , callContexts ));
798- if (ex instanceof CompletionException ) {
799- ex = ex .getCause ();
800- }
808+ super .onError (ex );
809+ ex = unwrapThrowable (ex );
801810 // Complete the futures for the remaining keys with the exception.
802811 for (int idx = 0 ; idx < queuedFutures .size (); idx ++) {
803812 K key = keys .get (idx );
0 commit comments