diff --git a/bolts-tasks/build.gradle b/bolts-tasks/build.gradle index 5731294..f40a7d5 100644 --- a/bolts-tasks/build.gradle +++ b/bolts-tasks/build.gradle @@ -16,6 +16,8 @@ sourceSets { dependencies { provided 'com.google.android:android:4.1.1.4' + compile 'io.reactivex:rxjava:1.1.5' + testCompile 'junit:junit:4.12' } diff --git a/bolts-tasks/src/main/java/bolts/Task.java b/bolts-tasks/src/main/java/bolts/Task.java index d6bf6a2..2105a99 100644 --- a/bolts-tasks/src/main/java/bolts/Task.java +++ b/bolts-tasks/src/main/java/bolts/Task.java @@ -23,6 +23,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import rx.Completable; +import rx.Observable; +import rx.Subscriber; +import rx.functions.Func0; + /** * Represents the result of an asynchronous operation. * @@ -1015,6 +1020,60 @@ private void runContinuations() { } } + /** + * creates an Rx Observable or Rx Completable from a Parse Task + * + * @param isNullable if the task is nullable this will get used to generate an {@link Completable} + * @return {@link Observable} + */ + private Observable asObservable(boolean isNullable) { + return Observable.defer(new Func0>() { + @Override + public Observable call() { + return Observable.create(new Observable.OnSubscribe() { + @Override + public void call(Subscriber subscriber) { + continueWith(new Continuation() { + @Override + public Object then(Task task) throws Exception { + if (task.isCancelled()) { +// NOTICE: doOnUnsubscribe(() -> Observable.just(query) in outside + subscriber.unsubscribe(); //sub.onCompleted();? + } else if (task.isFaulted()) { + Throwable error = task.getError(); + subscriber.onError(error); + } else { + TResult result = task.getResult(); + if (isNullable || result != null) subscriber.onNext(result); + subscriber.onCompleted(); + } + return null; + } + }); + } + }); + } + }); + } + + /** + * creates an Rx Observable from a Parse Task + * + * @return {@link Observable} + */ + public Observable asObservable() { + return asObservable(false); + } + + /** + * creates an Rx Completable from a Parse Task + * + * @return {@link Completable} + */ + public Completable asCompletable() { + return asObservable(true).toCompletable(); + } + /** * @deprecated Please use {@link bolts.TaskCompletionSource} instead. */