Skip to content

Commit

Permalink
Merge branch 'dont-ignore-connection-timeout' into 'master'
Browse files Browse the repository at this point in the history
Don't use infinite connection timeout in strategies.

See merge request vostok-libraries/clusterclient.core!2
  • Loading branch information
Гладышева Татьяна Владимировна committed Apr 2, 2024
2 parents 7245055 + 374a3e1 commit 555b28f
Show file tree
Hide file tree
Showing 8 changed files with 16 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,21 +211,21 @@ public void Should_launch_parallel_requests_with_correct_parameters()
}

[Test]
public void Should_launch_requests_except_last_with_connection_timeout()
public void Should_launch_all_requests_with_configured_connection_timeout()
{
sender.ClearReceivedCalls();

strategy = new ForkingRequestStrategy(delaysProvider, delaysPlanner, replicas.Length);

strategy.SendAsync(request, parameters, sender, Budget.WithRemaining(5.Seconds()), replicas, replicas.Length, token);

for (var i = 0; i < replicas.Length; ++i)
CompleteForkingDelay();

for (var i = 0; i < replicas.Length - 1; ++i)
sender.Received(1).SendToReplicaAsync(replicas[i], Arg.Any<Request>(), parameters.ConnectionTimeout, Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>());
sender.Received(1).SendToReplicaAsync(replicas.Last(), Arg.Any<Request>(), null, Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>());

sender.Received(1).SendToReplicaAsync(replicas.Last(), Arg.Any<Request>(), parameters.ConnectionTimeout, Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>());
}

[TestCase(0)]
Expand All @@ -243,7 +243,7 @@ public void Should_stop_when_any_of_requests_completes_with_accepted_result(int
}

[Test]
public void Should_issue_another_request_when_a_pending_one_ends_with_rejected_status([Values]bool unreliableHeaderPresent)
public void Should_issue_another_request_when_a_pending_one_ends_with_rejected_status([Values] bool unreliableHeaderPresent)
{
var task = strategy.SendAsync(request, parameters, sender, Budget.Infinite, replicas, replicas.Length, token);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void Should_fire_initial_requests_to_all_replicas_if_parallelism_level_is
}

[Test]
public void Should_ignore_connection_timeout()
public void Should_use_configured_connection_timeout_for_all_requests()
{
strategy = new ParallelRequestStrategy(int.MaxValue);

Expand All @@ -158,7 +158,7 @@ public void Should_ignore_connection_timeout()

foreach (var replica in replicas)
{
sender.Received(1).SendToReplicaAsync(replica, Arg.Any<Request>(), null, Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>());
sender.Received(1).SendToReplicaAsync(replica, Arg.Any<Request>(), parameters.ConnectionTimeout, Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ public void Should_limit_request_timeouts_by_remaining_time_budget()
}

[Test]
public void Should_not_use_connection_timeout_for_last_attempt()
public void Should_use_configured_connection_timeout()
{
Send(Budget.WithRemaining(1500.Milliseconds()));

sender.ReceivedCalls().Should().HaveCount(3);
sender.Received(1).SendToReplicaAsync(replica1, request, parameters.ConnectionTimeout, Arg.Any<TimeSpan>(), token);
sender.Received(1).SendToReplicaAsync(replica2, request, parameters.ConnectionTimeout, Arg.Any<TimeSpan>(), token);
sender.Received(1).SendToReplicaAsync(replica3, request, null, Arg.Any<TimeSpan>(), token);
sender.Received(1).SendToReplicaAsync(replica3, request, parameters.ConnectionTimeout, Arg.Any<TimeSpan>(), token);
}

[Test]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,20 +49,9 @@ public void Should_send_request_to_first_replica_with_correct_parameters()

strategy.SendAsync(request, parameters, sender, budget, replicas, replicas.Length, token).Wait();

sender.Received().SendToReplicaAsync(replicas[0], request, null, budget.Remaining, token);
}

[Test]
public void Should_ignore_connection_timeout()
{
parameters = parameters.WithConnectionTimeout(5.Seconds());

var token = new CancellationTokenSource().Token;

strategy.SendAsync(request, parameters, sender, budget, replicas, replicas.Length, token).Wait();

sender.Received().SendToReplicaAsync(Arg.Any<Uri>(), Arg.Any<Request>(), null, Arg.Any<TimeSpan>(), Arg.Any<CancellationToken>());
sender.Received().SendToReplicaAsync(replicas[0], request, parameters.ConnectionTimeout, budget.Remaining, token);
}


[Test]
public void Should_stop_on_first_result_if_its_response_is_accepted()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ public async Task SendAsync(Request request, RequestParameters parameters, IRequ
break;
}

var connectionAttemptTimeout = i == replicasCount - 1 ? null : parameters.ConnectionTimeout;

LaunchRequest(currentTasks, request, budget, sender, replicasEnumerator, connectionAttemptTimeout, linkedCancellationToken);
LaunchRequest(currentTasks, request, budget, sender, replicasEnumerator, parameters.ConnectionTimeout, linkedCancellationToken);

ScheduleForkIfNeeded(currentTasks, request, budget, i, replicasCount, linkedCancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public async Task SendAsync(Request request, RequestParameters parameters, IRequ
if (!replicasEnumerator.MoveNext())
throw new InvalidOperationException("Replicas enumerator ended prematurely. This is definitely a bug in code.");

currentTasks.Add(sender.SendToReplicaAsync(replicasEnumerator.Current, request, null, budget.Remaining, linkedCancellationToken));
currentTasks.Add(sender.SendToReplicaAsync(replicasEnumerator.Current, request, parameters.ConnectionTimeout, budget.Remaining, linkedCancellationToken));
}

while (currentTasks.Count > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ public async Task SendAsync(Request request, RequestParameters parameters, IRequ

var timeout = TimeSpanArithmetics.Min(timeoutsProvider.GetTimeout(request, budget, currentReplicaIndex++, replicasCount), budget.Remaining);

var connectionAttemptTimeout = currentReplicaIndex == replicasCount ? null : parameters.ConnectionTimeout;

var result = await sender.SendToReplicaAsync(replica, request, connectionAttemptTimeout, timeout, cancellationToken).ConfigureAwait(false);
var result = await sender.SendToReplicaAsync(replica, request, parameters.ConnectionTimeout, timeout, cancellationToken).ConfigureAwait(false);
if (result.Verdict == ResponseVerdict.Accept)
break;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class SingleReplicaRequestStrategy : IRequestStrategy
{
/// <inheritdoc />
public Task SendAsync(Request request, RequestParameters parameters, IRequestSender sender, IRequestTimeBudget budget, IEnumerable<Uri> replicas, int replicasCount, CancellationToken cancellationToken) =>
sender.SendToReplicaAsync(replicas.First(), request, null, budget.Remaining, cancellationToken);
sender.SendToReplicaAsync(replicas.First(), request, parameters.ConnectionTimeout, budget.Remaining, cancellationToken);

/// <inheritdoc />
public override string ToString() => "SingleReplica";
Expand Down

0 comments on commit 555b28f

Please sign in to comment.