From 31b407f3d66a1c36dcadd37a85f89b68ec62ff16 Mon Sep 17 00:00:00 2001 From: Lugduni Desrosiers <36016544+ddunig2@users.noreply.github.com> Date: Fri, 6 Dec 2019 02:58:12 -0500 Subject: [PATCH] backporting #6729 to 2.x branch. (#6746) --- src/main/java/io/reactivex/Flowable.java | 10 +++++++--- src/main/java/io/reactivex/Observable.java | 11 ++++++++--- .../io/reactivex/disposables/ActionDisposable.java | 3 +++ .../flowable/BlockingFlowableMostRecent.java | 4 ---- .../observable/BlockingObservableMostRecent.java | 4 ---- 5 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 0490daf3c2..41cde991a7 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -8266,7 +8266,6 @@ public final Single contains(final Object item) { * @return a Single that emits a single item: the number of items emitted by the source Publisher as a * 64-bit Long item * @see ReactiveX operators documentation: Count - * @see #count() */ @CheckReturnValue @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) @@ -15121,6 +15120,7 @@ public final Flowable switchIfEmpty(Publisher other) { * Publisher * @return a Flowable that emits the items emitted by the Publisher returned from applying {@code func} to the most recently emitted item emitted by the source Publisher * @see ReactiveX operators documentation: FlatMap + * @see #switchMapDelayError(Function) */ @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @@ -15156,6 +15156,7 @@ public final Flowable switchMap(FunctionReactiveX operators documentation: FlatMap + * @see #switchMapDelayError(Function, int) */ @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) @@ -15245,7 +15246,7 @@ public final Completable switchMapCompletable(@NonNull FunctionReactiveX operators documentation: FlatMap + * @see #switchMap(Function) * @since 2.0 */ @CheckReturnValue @@ -15320,6 +15322,7 @@ public final Flowable switchMapDelayError(FunctionReactiveX operators documentation: FlatMap + * @see #switchMap(Function, int) * @since 2.0 */ @CheckReturnValue @@ -15373,6 +15376,7 @@ Flowable switchMap0(Function> * and get subscribed to. * @return the new Flowable instance * @see #switchMapMaybe(Function) + * @see #switchMapMaybeDelayError(Function) * @since 2.2 */ @CheckReturnValue @@ -15444,7 +15448,7 @@ public final Flowable switchMapMaybeDelayError(@NonNull Function contains(final Object element) { * @return a Single that emits a single item: the number of items emitted by the source ObservableSource as a * 64-bit Long item * @see ReactiveX operators documentation: Count - * @see #count() */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @@ -12405,6 +12404,7 @@ public final Observable switchIfEmpty(ObservableSource other) { * ObservableSource * @return an Observable that emits the items emitted by the ObservableSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource * @see ReactiveX operators documentation: FlatMap + * @see #switchMapDelayError(Function) */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @@ -12434,6 +12434,7 @@ public final Observable switchMap(FunctionReactiveX operators documentation: FlatMap + * @see #switchMapDelayError(Function, int) */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @@ -12524,7 +12525,7 @@ public final Completable switchMapCompletable(@NonNull Function Observable switchMapMaybeDelayError(@NonNull FunctionReactiveX operators documentation: FlatMap + * @see #switchMapSingleDelayError(Function) * @since 2.2 */ @CheckReturnValue @@ -12647,6 +12649,7 @@ public final Observable switchMapSingle(@NonNull FunctionReactiveX operators documentation: FlatMap + * @see #switchMapSingle(Function) * @since 2.2 */ @CheckReturnValue @@ -12678,6 +12681,7 @@ public final Observable switchMapSingleDelayError(@NonNull FunctionReactiveX operators documentation: FlatMap + * @see #switchMap(Function) * @since 2.0 */ @CheckReturnValue @@ -12709,6 +12713,7 @@ public final Observable switchMapDelayError(FunctionReactiveX operators documentation: FlatMap + * @see #switchMap(Function, int) * @since 2.0 */ @CheckReturnValue diff --git a/src/main/java/io/reactivex/disposables/ActionDisposable.java b/src/main/java/io/reactivex/disposables/ActionDisposable.java index 447dfe2e34..f553f8b58e 100644 --- a/src/main/java/io/reactivex/disposables/ActionDisposable.java +++ b/src/main/java/io/reactivex/disposables/ActionDisposable.java @@ -16,6 +16,9 @@ import io.reactivex.functions.Action; import io.reactivex.internal.util.ExceptionHelper; +/** + * A Disposable container that manages an Action instance. + */ final class ActionDisposable extends ReferenceDisposable { private static final long serialVersionUID = -8219729196779211169L; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java index 298a3f21be..235d8507dc 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableMostRecent.java @@ -42,10 +42,6 @@ public BlockingFlowableMostRecent(Flowable source, T initialValue) { public Iterator iterator() { MostRecentSubscriber mostRecentSubscriber = new MostRecentSubscriber(initialValue); - /** - * Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain - * since it is for BlockingObservable. - */ source.subscribe(mostRecentSubscriber); return mostRecentSubscriber.getIterable(); diff --git a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java index 90a603f65c..04940be5e6 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java +++ b/src/main/java/io/reactivex/internal/operators/observable/BlockingObservableMostRecent.java @@ -42,10 +42,6 @@ public BlockingObservableMostRecent(ObservableSource source, T initialValue) public Iterator iterator() { MostRecentObserver mostRecentObserver = new MostRecentObserver(initialValue); - /** - * Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain - * since it is for BlockingObservable. - */ source.subscribe(mostRecentObserver); return mostRecentObserver.getIterable();