Skip to content

Commit

Permalink
3.x: Workaround for FutureTask.toString + JDK 11 build (#7173)
Browse files Browse the repository at this point in the history
  • Loading branch information
akarnokd authored Feb 1, 2021
1 parent 8dd6f21 commit 4d325c6
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 2 deletions.
33 changes: 33 additions & 0 deletions .github/workflows/gradle_jdk11.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# This workflow will build a Java project with Gradle
# For more information see: https://help.github.com/actions/language-and-framework-guides/building-and-testing-java-with-gradle

name: JDK 11

on:
push:
branches: [ 3.x ]
pull_request:
branches: [ 3.x ]

jobs:
build:

runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up JDK 11
uses: actions/setup-java@v1
with:
java-version: 11
- name: Cache Gradle packages
uses: actions/cache@v2
with:
path: ~/.gradle/caches
key: ${{ runner.os }}-gradle-${{ hashFiles('**/*.gradle') }}
restore-keys: ${{ runner.os }}-gradle
- name: Grant execute permission for gradlew
run: chmod +x gradlew
- name: Build PR
run: ./gradlew -PreleaseMode=pr build --stacktrace
#- name: Upload to Codecov
# uses: codecov/codecov-action@v1
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,24 @@ public final void setFuture(Future<?> future) {
public Runnable getWrappedRunnable() {
return runnable;
}

@Override
public String toString() {
String status;
Future<?> f = get();
if (f == FINISHED) {
status = "Finished";
} else if (f == DISPOSED) {
status = "Disposed";
} else {
Thread r = runner;
if (r != null) {
status = "Running on " + runner;
} else {
status = "Waiting";
}
}

return getClass().getSimpleName() + "[" + status + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ public void run() {
runner = null;
} catch (Throwable ex) {
// Exceptions.throwIfFatal(ex); nowhere to go
runner = null;
dispose();
runner = null;
RxJavaPlugins.onError(ex);
throw ex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ public void run() {
throw e;
}
} finally {
lazySet(THREAD_INDEX, null);
Object o = get(PARENT_INDEX);
if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) {
((DisposableContainer)o).delete(this);
Expand All @@ -81,6 +80,7 @@ public void run() {
break;
}
}
lazySet(THREAD_INDEX, null);
}
}

Expand Down Expand Up @@ -137,4 +137,26 @@ public boolean isDisposed() {
Object o = get(PARENT_INDEX);
return o == PARENT_DISPOSED || o == DONE;
}

@Override
public String toString() {
String state;
Object o = get(FUTURE_INDEX);
if (o == DONE) {
state = "Finished";
} else if (o == SYNC_DISPOSED) {
state = "Disposed(Sync)";
} else if (o == ASYNC_DISPOSED) {
state = "Disposed(Async)";
} else {
o = get(THREAD_INDEX);
if (o == null) {
state = "Waiting";
} else {
state = "Running on " + o;
}
}

return getClass().getSimpleName() + "[" + state + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,31 @@ public void run() {
TestHelper.race(r1, r2);
}
}

static class TestDirectTask extends AbstractDirectTask {
private static final long serialVersionUID = 587679821055711738L;

TestDirectTask() {
super(Functions.EMPTY_RUNNABLE);
}
}

@Test
public void toStringStates() {
TestDirectTask task = new TestDirectTask();

assertEquals("TestDirectTask[Waiting]", task.toString());

task.runner = Thread.currentThread();

assertEquals("TestDirectTask[Running on " + Thread.currentThread() + "]", task.toString());

task.dispose();

assertEquals("TestDirectTask[Disposed]", task.toString());

task.set(AbstractDirectTask.FINISHED);

assertEquals("TestDirectTask[Finished]", task.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,29 @@ public void withParentIsDisposed() {

assertFalse(set.remove(run));
}

@Test
public void toStringStates() {
CompositeDisposable set = new CompositeDisposable();
ScheduledRunnable task = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set);

assertEquals("ScheduledRunnable[Waiting]", task.toString());

task.set(ScheduledRunnable.THREAD_INDEX, Thread.currentThread());

assertEquals("ScheduledRunnable[Running on " + Thread.currentThread() + "]", task.toString());

task.dispose();

assertEquals("ScheduledRunnable[Disposed(Sync)]", task.toString());

task.set(ScheduledRunnable.FUTURE_INDEX, ScheduledRunnable.DONE);

assertEquals("ScheduledRunnable[Finished]", task.toString());

task = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set);
task.dispose();

assertEquals("ScheduledRunnable[Disposed(Async)]", task.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.core.Scheduler.Worker;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.disposables.SequentialDisposable;
Expand Down Expand Up @@ -771,4 +772,51 @@ public void schedulePeriodicallyDirectNullRunnable() {
assertEquals("run is null", npe.getMessage());
}
}

void schedulePrint(Function<Runnable, Disposable> onSchedule) {
CountDownLatch waitForBody = new CountDownLatch(1);
CountDownLatch waitForPrint = new CountDownLatch(1);

try {
Disposable d = onSchedule.apply(() -> {
waitForBody.countDown();
try {
waitForPrint.await();
} catch (InterruptedException ex) {
ex.printStackTrace();
}
});

waitForBody.await();

assertNotEquals("", d.toString());
} catch (Throwable ex) {
throw new AssertionError(ex);
} finally {
waitForPrint.countDown();
}
}

@Test
public void scheduleDirectPrint() {
if (getScheduler() instanceof TrampolineScheduler) {
// no concurrency with Trampoline
return;
}
schedulePrint(r -> getScheduler().scheduleDirect(r));
}

@Test
public void schedulePrint() {
if (getScheduler() instanceof TrampolineScheduler) {
// no concurrency with Trampoline
return;
}
Worker worker = getScheduler().createWorker();
try {
schedulePrint(worker::schedule);
} finally {
worker.dispose();
}
}
}

0 comments on commit 4d325c6

Please sign in to comment.