From 52346a140c29b659023f87e348c26889b6b39b13 Mon Sep 17 00:00:00 2001 From: Mark Sholte Date: Wed, 11 Dec 2019 02:20:42 -0600 Subject: [PATCH] 2.x: Zip, CombineLatest, and Amb operators throw when supplied with ObservableSource implementation that doesn't subclass Observable (#6754) * 2.x: Zip, CombineLatest, and Amb operators throw when supplied with ObservableSource implementation that doesn't subclass Observable #6753 * 2.x: add tests for allowing arbitrary ObservableSource implementations --- .../operators/observable/ObservableAmb.java | 2 +- .../observable/ObservableCombineLatest.java | 2 +- .../operators/observable/ObservableZip.java | 2 +- .../observable/ObservableAmbTest.java | 17 +++++++++++ .../ObservableCombineLatestTest.java | 29 +++++++++++++++++++ .../observable/ObservableZipTest.java | 29 +++++++++++++++++++ 6 files changed, 78 insertions(+), 3 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableAmb.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableAmb.java index 2ed4fcd93b..53068e7a91 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableAmb.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableAmb.java @@ -36,7 +36,7 @@ public void subscribeActual(Observer observer) { ObservableSource[] sources = this.sources; int count = 0; if (sources == null) { - sources = new Observable[8]; + sources = new ObservableSource[8]; try { for (ObservableSource p : sourcesIterable) { if (p == null) { diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java index ccbfc8e6d3..56b62cdfeb 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java @@ -49,7 +49,7 @@ public void subscribeActual(Observer observer) { ObservableSource[] sources = this.sources; int count = 0; if (sources == null) { - sources = new Observable[8]; + sources = new ObservableSource[8]; for (ObservableSource p : sourcesIterable) { if (count == sources.length) { ObservableSource[] b = new ObservableSource[count + (count >> 2)]; diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java index af465c43c6..7f00383834 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableZip.java @@ -50,7 +50,7 @@ public void subscribeActual(Observer observer) { ObservableSource[] sources = this.sources; int count = 0; if (sources == null) { - sources = new Observable[8]; + sources = new ObservableSource[8]; for (ObservableSource p : sourcesIterable) { if (count == sources.length) { ObservableSource[] b = new ObservableSource[count + (count >> 2)]; diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java index 6e2c737f75..a76b8f22e6 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableAmbTest.java @@ -264,6 +264,23 @@ public void singleIterable() { .assertResult(1); } + /** + * Ensures that an ObservableSource implementation can be supplied that doesn't subclass Observable + */ + @Test + public void singleIterableNotSubclassingObservable() { + final ObservableSource s1 = new ObservableSource() { + @Override + public void subscribe (final Observer observer) { + Observable.just(1).subscribe(observer); + } + }; + + Observable.amb(Collections.singletonList(s1)) + .test() + .assertResult(1); + } + @SuppressWarnings("unchecked") @Test public void disposed() { diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableCombineLatestTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableCombineLatestTest.java index 418a19678e..736a0bd9b5 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableCombineLatestTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableCombineLatestTest.java @@ -787,6 +787,34 @@ public Object apply(Object[] a) throws Exception { .assertResult("[1, 2]"); } + /** + * Ensures that an ObservableSource implementation can be supplied that doesn't subclass Observable + */ + @Test + public void combineLatestIterableOfSourcesNotSubclassingObservable() { + final ObservableSource s1 = new ObservableSource() { + @Override + public void subscribe (final Observer observer) { + Observable.just(1).subscribe(observer); + } + }; + final ObservableSource s2 = new ObservableSource() { + @Override + public void subscribe (final Observer observer) { + Observable.just(2).subscribe(observer); + } + }; + + Observable.combineLatest(Arrays.asList(s1, s2), new Function() { + @Override + public Object apply(Object[] a) throws Exception { + return Arrays.toString(a); + } + }) + .test() + .assertResult("[1, 2]"); + } + @Test @SuppressWarnings("unchecked") public void combineLatestDelayErrorArrayOfSources() { @@ -1216,4 +1244,5 @@ public Object apply(Object[] a) throws Exception { .awaitDone(5, TimeUnit.SECONDS) .assertFailure(TestException.class, 42); } + } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java index 14fda0058d..eca18c71b3 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableZipTest.java @@ -1350,6 +1350,34 @@ public Object apply(Object[] a) throws Exception { .assertResult("[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]"); } + /** + * Ensures that an ObservableSource implementation can be supplied that doesn't subclass Observable + */ + @Test + public void zipIterableNotSubclassingObservable() { + final ObservableSource s1 = new ObservableSource() { + @Override + public void subscribe (final Observer observer) { + Observable.just(1).subscribe(observer); + } + }; + final ObservableSource s2 = new ObservableSource() { + @Override + public void subscribe (final Observer observer) { + Observable.just(2).subscribe(observer); + } + }; + + Observable.zip(Arrays.asList(s1, s2), new Function() { + @Override + public Object apply(Object[] a) throws Exception { + return Arrays.toString(a); + } + }) + .test() + .assertResult("[1, 2]"); + } + @Test public void dispose() { TestHelper.checkDisposed(Observable.zip(Observable.just(1), Observable.just(1), new BiFunction() { @@ -1457,4 +1485,5 @@ public Object apply(Object[] a) throws Exception { assertEquals(0, counter.get()); } + }