Skip to content

Commit

Permalink
Revert "Assign event loop to tasks and run methods from a task in the…
Browse files Browse the repository at this point in the history
… same event loop"

This reverts commit b511e7a.
  • Loading branch information
shangm2 committed Feb 19, 2025
1 parent bfa4972 commit 9612ad5
Show file tree
Hide file tree
Showing 7 changed files with 575 additions and 577 deletions.
6 changes: 0 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,6 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>${dep.netty.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-testing-docker</artifactId>
Expand Down
5 changes: 0 additions & 5 deletions presto-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,6 @@
<optional>true</optional>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</dependency>

<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.server.remotetask;

import com.facebook.airlift.concurrent.SetThreadName;
import com.facebook.airlift.http.client.HttpClient;
import com.facebook.airlift.http.client.Request;
import com.facebook.airlift.http.client.ResponseHandler;
Expand All @@ -37,9 +38,13 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import io.netty.channel.EventLoop;

import javax.annotation.concurrent.GuardedBy;

import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;

import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
Expand All @@ -55,7 +60,6 @@
import static com.facebook.presto.spi.StandardErrorCode.REMOTE_TASK_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.REMOTE_TASK_MISMATCH;
import static com.facebook.presto.util.Failures.REMOTE_TASK_MISMATCH_ERROR;
import static com.google.common.base.Verify.verify;
import static io.airlift.units.Duration.nanosSince;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
Expand All @@ -71,16 +75,20 @@ class ContinuousTaskStatusFetcher
private final Codec<TaskStatus> taskStatusCodec;

private final Duration refreshMaxWait;
private final EventLoop taskEventLoop;
private final Executor executor;
private final HttpClient httpClient;
private final RequestErrorTracker errorTracker;
private final RemoteTaskStats stats;
private final boolean binaryTransportEnabled;
private final boolean thriftTransportEnabled;
private final Protocol thriftProtocol;
private long currentRequestStartNanos;

private final AtomicLong currentRequestStartNanos = new AtomicLong();

@GuardedBy("this")
private boolean running;

@GuardedBy("this")
private ListenableFuture<BaseResponse<TaskStatus>> future;

public ContinuousTaskStatusFetcher(
Expand All @@ -89,9 +97,10 @@ public ContinuousTaskStatusFetcher(
TaskStatus initialTaskStatus,
Duration refreshMaxWait,
Codec<TaskStatus> taskStatusCodec,
EventLoop taskEventLoop,
Executor executor,
HttpClient httpClient,
Duration maxErrorDuration,
ScheduledExecutorService errorScheduledExecutor,
RemoteTaskStats stats,
boolean binaryTransportEnabled,
boolean thriftTransportEnabled,
Expand All @@ -101,25 +110,23 @@ public ContinuousTaskStatusFetcher(

this.taskId = requireNonNull(taskId, "taskId is null");
this.onFail = requireNonNull(onFail, "onFail is null");
this.taskStatus = new StateMachine<>("task-" + taskId, taskEventLoop, initialTaskStatus);
this.taskStatus = new StateMachine<>("task-" + taskId, executor, initialTaskStatus);

this.refreshMaxWait = requireNonNull(refreshMaxWait, "refreshMaxWait is null");
this.taskStatusCodec = requireNonNull(taskStatusCodec, "taskStatusCodec is null");

this.taskEventLoop = requireNonNull(taskEventLoop, "taskEventLoop is null");
this.executor = requireNonNull(executor, "executor is null");
this.httpClient = requireNonNull(httpClient, "httpClient is null");

this.errorTracker = taskRequestErrorTracker(taskId, initialTaskStatus.getSelf(), maxErrorDuration, taskEventLoop, "getting task status");
this.errorTracker = taskRequestErrorTracker(taskId, initialTaskStatus.getSelf(), maxErrorDuration, errorScheduledExecutor, "getting task status");
this.stats = requireNonNull(stats, "stats is null");
this.binaryTransportEnabled = binaryTransportEnabled;
this.thriftTransportEnabled = thriftTransportEnabled;
this.thriftProtocol = requireNonNull(thriftProtocol, "thriftProtocol is null");
}

public void start()
public synchronized void start()
{
verify(taskEventLoop.inEventLoop());

if (running) {
// already running
return;
Expand All @@ -128,10 +135,8 @@ public void start()
scheduleNextRequest();
}

public void stop()
public synchronized void stop()
{
verify(taskEventLoop.inEventLoop());

running = false;
if (future != null) {
// do not terminate if the request is already running to avoid closing pooled connections
Expand All @@ -140,10 +145,8 @@ public void stop()
}
}

private void scheduleNextRequest()
private synchronized void scheduleNextRequest()
{
verify(taskEventLoop.inEventLoop());

// stopped or done?
TaskStatus taskStatus = getTaskStatus();
if (!running || taskStatus.getState().isDone()) {
Expand All @@ -160,7 +163,7 @@ private void scheduleNextRequest()
// if throttled due to error, asynchronously wait for timeout and try again
ListenableFuture<?> errorRateLimit = errorTracker.acquireRequestPermit();
if (!errorRateLimit.isDone()) {
errorRateLimit.addListener(this::scheduleNextRequest, taskEventLoop);
errorRateLimit.addListener(this::scheduleNextRequest, executor);
return;
}

Expand All @@ -186,7 +189,7 @@ else if (binaryTransportEnabled) {

errorTracker.startRequest();
future = httpClient.executeAsync(request, responseHandler);
currentRequestStartNanos = System.nanoTime();
currentRequestStartNanos.set(System.nanoTime());
FutureCallback callback;
if (thriftTransportEnabled) {
callback = new ThriftHttpResponseHandler(this, request.getUri(), stats.getHttpResponseStats(), REMOTE_TASK_ERROR);
Expand All @@ -198,7 +201,7 @@ else if (binaryTransportEnabled) {
Futures.addCallback(
future,
callback,
taskEventLoop);
executor);
}

TaskStatus getTaskStatus()
Expand All @@ -209,62 +212,59 @@ TaskStatus getTaskStatus()
@Override
public void success(TaskStatus value)
{
verify(taskEventLoop.inEventLoop());

updateStats(currentRequestStartNanos);
try {
updateTaskStatus(value);
errorTracker.requestSucceeded();
}
finally {
scheduleNextRequest();
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
try {
updateTaskStatus(value);
errorTracker.requestSucceeded();
}
finally {
scheduleNextRequest();
}
}
}

@Override
public void failed(Throwable cause)
{
verify(taskEventLoop.inEventLoop());

updateStats(currentRequestStartNanos);
try {
// if task not already done, record error
TaskStatus taskStatus = getTaskStatus();
if (!taskStatus.getState().isDone()) {
errorTracker.requestFailed(cause);
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
try {
// if task not already done, record error
TaskStatus taskStatus = getTaskStatus();
if (!taskStatus.getState().isDone()) {
errorTracker.requestFailed(cause);
}
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
finally {
scheduleNextRequest();
}
}
catch (Error e) {
onFail.accept(e);
throw e;
}
catch (RuntimeException e) {
onFail.accept(e);
}
finally {
scheduleNextRequest();
}
}

@Override
public void fatal(Throwable cause)
{
verify(taskEventLoop.inEventLoop());

updateStats(currentRequestStartNanos);
onFail.accept(cause);
try (SetThreadName ignored = new SetThreadName("ContinuousTaskStatusFetcher-%s", taskId)) {
updateStats(currentRequestStartNanos.get());
onFail.accept(cause);
}
}

void updateTaskStatus(TaskStatus newValue)
{
verify(taskEventLoop.inEventLoop());

// change to new value if old value is not changed and new value has a newer version
AtomicBoolean taskMismatch = new AtomicBoolean();
taskStatus.setIf(newValue, oldValue -> {
// did the task instance id change
boolean isEmpty = (oldValue.getTaskInstanceIdLeastSignificantBits() == 0 && oldValue.getTaskInstanceIdMostSignificantBits() == 0)
|| (newValue.getTaskInstanceIdLeastSignificantBits() == 0 && newValue.getTaskInstanceIdMostSignificantBits() == 0);
boolean isEmpty = oldValue.getTaskInstanceIdLeastSignificantBits() == 0 && oldValue.getTaskInstanceIdMostSignificantBits() == 0;
if (!isEmpty &&
!(oldValue.getTaskInstanceIdLeastSignificantBits() == newValue.getTaskInstanceIdLeastSignificantBits() &&
oldValue.getTaskInstanceIdMostSignificantBits() == newValue.getTaskInstanceIdMostSignificantBits())) {
Expand All @@ -291,6 +291,11 @@ void updateTaskStatus(TaskStatus newValue)
}
}

public synchronized boolean isRunning()
{
return running;
}

/**
* Listener is always notified asynchronously using a dedicated notification thread pool so, care should
* be taken to avoid leaking {@code this} when adding a listener in a constructor. Additionally, it is
Expand All @@ -303,8 +308,6 @@ public void addStateChangeListener(StateMachine.StateChangeListener<TaskStatus>

private void updateStats(long currentRequestStartNanos)
{
verify(taskEventLoop.inEventLoop());

stats.statusRoundTripMillis(nanosSince(currentRequestStartNanos).toMillis());
}
}
Loading

0 comments on commit 9612ad5

Please sign in to comment.