Skip to content

Commit

Permalink
backporting #6729 to 2.x branch. (#6746)
Browse files Browse the repository at this point in the history
  • Loading branch information
ddunig2 authored and akarnokd committed Dec 6, 2019
1 parent 56d6e91 commit 31b407f
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 14 deletions.
10 changes: 7 additions & 3 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -8266,7 +8266,6 @@ public final Single<Boolean> contains(final Object item) {
* @return a Single that emits a single item: the number of items emitted by the source Publisher as a
* 64-bit Long item
* @see <a href="http://reactivex.io/documentation/operators/count.html">ReactiveX operators documentation: Count</a>
* @see #count()
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
Expand Down Expand Up @@ -15121,6 +15120,7 @@ public final Flowable<T> switchIfEmpty(Publisher<? extends T> other) {
* Publisher
* @return a Flowable that emits the items emitted by the Publisher returned from applying {@code func} to the most recently emitted item emitted by the source Publisher
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #switchMapDelayError(Function)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
Expand Down Expand Up @@ -15156,6 +15156,7 @@ public final <R> Flowable<R> switchMap(Function<? super T, ? extends Publisher<?
* the number of elements to prefetch from the current active inner Publisher
* @return a Flowable that emits the items emitted by the Publisher returned from applying {@code func} to the most recently emitted item emitted by the source Publisher
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #switchMapDelayError(Function, int)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.FULL)
Expand Down Expand Up @@ -15245,7 +15246,7 @@ public final Completable switchMapCompletable(@NonNull Function<? super T, ? ext
* {@link CompletableSource} to be subscribed to and awaited for
* (non blockingly) for its terminal event
* @return the new Completable instance
* @see #switchMapCompletableDelayError(Function)
* @see #switchMapCompletable(Function)
* @since 2.2
*/
@CheckReturnValue
Expand Down Expand Up @@ -15283,6 +15284,7 @@ public final Completable switchMapCompletableDelayError(@NonNull Function<? supe
* Publisher
* @return a Flowable that emits the items emitted by the Publisher returned from applying {@code func} to the most recently emitted item emitted by the source Publisher
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #switchMap(Function)
* @since 2.0
*/
@CheckReturnValue
Expand Down Expand Up @@ -15320,6 +15322,7 @@ public final <R> Flowable<R> switchMapDelayError(Function<? super T, ? extends P
* the number of elements to prefetch from the current active inner Publisher
* @return a Flowable that emits the items emitted by the Publisher returned from applying {@code func} to the most recently emitted item emitted by the source Publisher
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #switchMap(Function, int)
* @since 2.0
*/
@CheckReturnValue
Expand Down Expand Up @@ -15373,6 +15376,7 @@ <R> Flowable<R> switchMap0(Function<? super T, ? extends Publisher<? extends R>>
* and get subscribed to.
* @return the new Flowable instance
* @see #switchMapMaybe(Function)
* @see #switchMapMaybeDelayError(Function)
* @since 2.2
*/
@CheckReturnValue
Expand Down Expand Up @@ -15444,7 +15448,7 @@ public final <R> Flowable<R> switchMapMaybeDelayError(@NonNull Function<? super
* return a {@code SingleSource} to replace the current active inner source
* and get subscribed to.
* @return the new Flowable instance
* @see #switchMapSingle(Function)
* @see #switchMapSingleDelayError(Function)
* @since 2.2
*/
@CheckReturnValue
Expand Down
11 changes: 8 additions & 3 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7283,7 +7283,6 @@ public final Single<Boolean> contains(final Object element) {
* @return a Single that emits a single item: the number of items emitted by the source ObservableSource as a
* 64-bit Long item
* @see <a href="http://reactivex.io/documentation/operators/count.html">ReactiveX operators documentation: Count</a>
* @see #count()
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down Expand Up @@ -12405,6 +12404,7 @@ public final Observable<T> switchIfEmpty(ObservableSource<? extends T> other) {
* ObservableSource
* @return an Observable that emits the items emitted by the ObservableSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #switchMapDelayError(Function)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down Expand Up @@ -12434,6 +12434,7 @@ public final <R> Observable<R> switchMap(Function<? super T, ? extends Observabl
* the number of elements to prefetch from the current active inner ObservableSource
* @return an Observable that emits the items emitted by the ObservableSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #switchMapDelayError(Function, int)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down Expand Up @@ -12524,7 +12525,7 @@ public final Completable switchMapCompletable(@NonNull Function<? super T, ? ext
* {@link CompletableSource} to be subscribed to and awaited for
* (non blockingly) for its terminal event
* @return the new Completable instance
* @see #switchMapCompletableDelayError(Function)
* @see #switchMapCompletable(Function)
* @since 2.2
*/
@CheckReturnValue
Expand Down Expand Up @@ -12560,7 +12561,7 @@ public final Completable switchMapCompletableDelayError(@NonNull Function<? supe
* return a {@code MaybeSource} to replace the current active inner source
* and get subscribed to.
* @return the new Observable instance
* @see #switchMapMaybe(Function)
* @see #switchMapMaybeDelayError(Function)
* @since 2.2
*/
@CheckReturnValue
Expand Down Expand Up @@ -12616,6 +12617,7 @@ public final <R> Observable<R> switchMapMaybeDelayError(@NonNull Function<? supe
* SingleSource
* @return an Observable that emits the item emitted by the SingleSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #switchMapSingleDelayError(Function)
* @since 2.2
*/
@CheckReturnValue
Expand Down Expand Up @@ -12647,6 +12649,7 @@ public final <R> Observable<R> switchMapSingle(@NonNull Function<? super T, ? ex
* SingleSource
* @return an Observable that emits the item emitted by the SingleSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #switchMapSingle(Function)
* @since 2.2
*/
@CheckReturnValue
Expand Down Expand Up @@ -12678,6 +12681,7 @@ public final <R> Observable<R> switchMapSingleDelayError(@NonNull Function<? sup
* ObservableSource
* @return an Observable that emits the items emitted by the ObservableSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #switchMap(Function)
* @since 2.0
*/
@CheckReturnValue
Expand Down Expand Up @@ -12709,6 +12713,7 @@ public final <R> Observable<R> switchMapDelayError(Function<? super T, ? extends
* the number of elements to prefetch from the current active inner ObservableSource
* @return an Observable that emits the items emitted by the ObservableSource returned from applying {@code func} to the most recently emitted item emitted by the source ObservableSource
* @see <a href="http://reactivex.io/documentation/operators/flatmap.html">ReactiveX operators documentation: FlatMap</a>
* @see #switchMap(Function, int)
* @since 2.0
*/
@CheckReturnValue
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/io/reactivex/disposables/ActionDisposable.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
import io.reactivex.functions.Action;
import io.reactivex.internal.util.ExceptionHelper;

/**
* A Disposable container that manages an Action instance.
*/
final class ActionDisposable extends ReferenceDisposable<Action> {

private static final long serialVersionUID = -8219729196779211169L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ public BlockingFlowableMostRecent(Flowable<T> source, T initialValue) {
public Iterator<T> iterator() {
MostRecentSubscriber<T> mostRecentSubscriber = new MostRecentSubscriber<T>(initialValue);

/**
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
* since it is for BlockingObservable.
*/
source.subscribe(mostRecentSubscriber);

return mostRecentSubscriber.getIterable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ public BlockingObservableMostRecent(ObservableSource<T> source, T initialValue)
public Iterator<T> iterator() {
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);

/**
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
* since it is for BlockingObservable.
*/
source.subscribe(mostRecentObserver);

return mostRecentObserver.getIterable();
Expand Down

0 comments on commit 31b407f

Please sign in to comment.