Skip to content

Commit

Permalink
2.x: Zip, CombineLatest, and Amb operators throw when supplied with O…
Browse files Browse the repository at this point in the history
…bservableSource implementation that doesn't subclass Observable (#6754)

* 2.x: Zip, CombineLatest, and Amb operators throw when supplied with ObservableSource implementation that doesn't subclass Observable #6753

* 2.x: add tests for allowing arbitrary ObservableSource implementations
  • Loading branch information
mgsholte authored and akarnokd committed Dec 11, 2019
1 parent 31b407f commit 52346a1
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public void subscribeActual(Observer<? super T> observer) {
ObservableSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new Observable[8];
sources = new ObservableSource[8];
try {
for (ObservableSource<? extends T> p : sourcesIterable) {
if (p == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void subscribeActual(Observer<? super R> observer) {
ObservableSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new Observable[8];
sources = new ObservableSource[8];
for (ObservableSource<? extends T> p : sourcesIterable) {
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void subscribeActual(Observer<? super R> observer) {
ObservableSource<? extends T>[] sources = this.sources;
int count = 0;
if (sources == null) {
sources = new Observable[8];
sources = new ObservableSource[8];
for (ObservableSource<? extends T> p : sourcesIterable) {
if (count == sources.length) {
ObservableSource<? extends T>[] b = new ObservableSource[count + (count >> 2)];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,23 @@ public void singleIterable() {
.assertResult(1);
}

/**
* Ensures that an ObservableSource implementation can be supplied that doesn't subclass Observable
*/
@Test
public void singleIterableNotSubclassingObservable() {
final ObservableSource<Integer> s1 = new ObservableSource<Integer>() {
@Override
public void subscribe (final Observer<? super Integer> observer) {
Observable.just(1).subscribe(observer);
}
};

Observable.amb(Collections.singletonList(s1))
.test()
.assertResult(1);
}

@SuppressWarnings("unchecked")
@Test
public void disposed() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,34 @@ public Object apply(Object[] a) throws Exception {
.assertResult("[1, 2]");
}

/**
* Ensures that an ObservableSource implementation can be supplied that doesn't subclass Observable
*/
@Test
public void combineLatestIterableOfSourcesNotSubclassingObservable() {
final ObservableSource<Integer> s1 = new ObservableSource<Integer>() {
@Override
public void subscribe (final Observer<? super Integer> observer) {
Observable.just(1).subscribe(observer);
}
};
final ObservableSource<Integer> s2 = new ObservableSource<Integer>() {
@Override
public void subscribe (final Observer<? super Integer> observer) {
Observable.just(2).subscribe(observer);
}
};

Observable.combineLatest(Arrays.asList(s1, s2), new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return Arrays.toString(a);
}
})
.test()
.assertResult("[1, 2]");
}

@Test
@SuppressWarnings("unchecked")
public void combineLatestDelayErrorArrayOfSources() {
Expand Down Expand Up @@ -1216,4 +1244,5 @@ public Object apply(Object[] a) throws Exception {
.awaitDone(5, TimeUnit.SECONDS)
.assertFailure(TestException.class, 42);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1350,6 +1350,34 @@ public Object apply(Object[] a) throws Exception {
.assertResult("[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]");
}

/**
* Ensures that an ObservableSource implementation can be supplied that doesn't subclass Observable
*/
@Test
public void zipIterableNotSubclassingObservable() {
final ObservableSource<Integer> s1 = new ObservableSource<Integer>() {
@Override
public void subscribe (final Observer<? super Integer> observer) {
Observable.just(1).subscribe(observer);
}
};
final ObservableSource<Integer> s2 = new ObservableSource<Integer>() {
@Override
public void subscribe (final Observer<? super Integer> observer) {
Observable.just(2).subscribe(observer);
}
};

Observable.zip(Arrays.asList(s1, s2), new Function<Object[], Object>() {
@Override
public Object apply(Object[] a) throws Exception {
return Arrays.toString(a);
}
})
.test()
.assertResult("[1, 2]");
}

@Test
public void dispose() {
TestHelper.checkDisposed(Observable.zip(Observable.just(1), Observable.just(1), new BiFunction<Integer, Integer, Object>() {
Expand Down Expand Up @@ -1457,4 +1485,5 @@ public Object apply(Object[] a) throws Exception {

assertEquals(0, counter.get());
}

}

0 comments on commit 52346a1

Please sign in to comment.