Skip to content

Commit

Permalink
Allowing actionable options, bug fixes with regards to timeouts, bett…
Browse files Browse the repository at this point in the history
…er error handling, minor optimizations, improved documentation, added considerably more tests.
  • Loading branch information
MarkCiliaVincenti committed Dec 17, 2022
1 parent e719dac commit f89c81d
Show file tree
Hide file tree
Showing 9 changed files with 973 additions and 221 deletions.
1 change: 1 addition & 0 deletions AsyncKeyedLock.Tests/AsyncKeyedLock.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="6.8.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.1" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace AsyncKeyedLock.Tests
{
public class Tests
public class OriginalTests
{
[Fact]
public async Task BasicTest()
Expand All @@ -19,8 +19,9 @@ public async Task BasicTest()
var key = Convert.ToInt32(Math.Ceiling((double)i / concurrency));
using (await asyncKeyedLocker.LockAsync(key))
{
await Task.Delay(20);
concurrentQueue.Enqueue((true, key));
await Task.Delay(100);
await Task.Delay(80);
concurrentQueue.Enqueue((false, key));
}
});
Expand Down Expand Up @@ -70,8 +71,9 @@ public async Task BasicTestGenerics()
var key = Convert.ToInt32(Math.Ceiling((double)i / concurrency));
using (await asyncKeyedLocker.LockAsync(key))
{
await Task.Delay(20);
concurrentQueue.Enqueue((true, key));
await Task.Delay(10);
await Task.Delay(80);
concurrentQueue.Enqueue((false, key));
}
});
Expand Down Expand Up @@ -121,8 +123,65 @@ public async Task BasicTestGenericsPooling50k()
var key = Convert.ToInt32(Math.Ceiling((double)i / concurrency));
using (await asyncKeyedLocker.LockAsync(key))
{
await Task.Delay(20);
concurrentQueue.Enqueue((true, key));
await Task.Delay(10);
await Task.Delay(80);
concurrentQueue.Enqueue((false, key));
}
});
await Task.WhenAll(tasks.AsParallel());

bool valid = concurrentQueue.Count == locks * concurrency * 2;

var entered = new HashSet<int>();

while (valid && !concurrentQueue.IsEmpty)
{
concurrentQueue.TryDequeue(out var result);
if (result.entered)
{
if (entered.Contains(result.key))
{
valid = false;
break;
}
entered.Add(result.key);
}
else
{
if (!entered.Contains(result.key))
{
valid = false;
break;
}
entered.Remove(result.key);
}
}

Assert.True(valid);
}

[Fact]
public async Task BasicTestGenericsPooling50kUnfilled()
{
var locks = 50_000;
var concurrency = 50;
var asyncKeyedLocker = new AsyncKeyedLocker<int>(o =>
{
o.PoolSize = 50_000;
o.PoolInitialFill = 0;
}, Environment.ProcessorCount, 50_000);
var concurrentQueue = new ConcurrentQueue<(bool entered, int key)>();

var tasks = Enumerable.Range(1, locks * concurrency)
.Select(async i =>
{
var key = Convert.ToInt32(Math.Ceiling((double)i / concurrency));
using (await asyncKeyedLocker.LockAsync(key))
{
await Task.Delay(20);
concurrentQueue.Enqueue((true, key));
await Task.Delay(80);
concurrentQueue.Enqueue((false, key));
}
});
Expand Down Expand Up @@ -172,8 +231,9 @@ public async Task BasicTestGenericsPoolingProcessorCount()
var key = Convert.ToInt32(Math.Ceiling((double)i / concurrency));
using (await asyncKeyedLocker.LockAsync(key))
{
await Task.Delay(20);
concurrentQueue.Enqueue((true, key));
await Task.Delay(10);
await Task.Delay(80);
concurrentQueue.Enqueue((false, key));
}
});
Expand Down Expand Up @@ -223,8 +283,9 @@ public async Task BasicTestGenericsPooling10k()
var key = Convert.ToInt32(Math.Ceiling((double)i / concurrency));
using (await asyncKeyedLocker.LockAsync(key))
{
await Task.Delay(20);
concurrentQueue.Enqueue((true, key));
await Task.Delay(10);
await Task.Delay(80);
concurrentQueue.Enqueue((false, key));
}
});
Expand Down Expand Up @@ -274,8 +335,9 @@ public async Task BasicTestGenericsString()
var key = Convert.ToInt32(Math.Ceiling((double)i / 5)).ToString();
using (await asyncKeyedLocker.LockAsync(key))
{
await Task.Delay(20);
concurrentQueue.Enqueue((true, key));
await Task.Delay(100);
await Task.Delay(80);
concurrentQueue.Enqueue((false, key));
}
});
Expand Down
225 changes: 225 additions & 0 deletions AsyncKeyedLock.Tests/TestsForAsyncKeyedLock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
using FluentAssertions;
using System.Collections.Concurrent;
using Xunit;

namespace AsyncKeyedLock.Tests;

/// <summary>
/// Adapted from https://github.com/amoerie/keyed-semaphores/blob/main/KeyedSemaphores.Tests/TestsForKeyedSemaphore.cs
/// </summary>
public class TestsForAsyncKeyedLock
{
private static readonly AsyncKeyedLocker<string> _asyncKeyedLocker = new AsyncKeyedLocker<string>();

public class Async : TestsForAsyncKeyedLock
{
[Fact]
public async Task ShouldRunThreadsWithDistinctKeysInParallel()
{
// Arrange
var currentParallelism = 0;
var maxParallelism = 0;
var parallelismLock = new object();

// 100 threads, 100 keys
var threads = Enumerable.Range(0, 100)
.Select(i => Task.Run(async () => await OccupyTheLockALittleBit(i).ConfigureAwait(false)))
.ToList();

// Act
await Task.WhenAll(threads).ConfigureAwait(false);

maxParallelism.Should().BeGreaterThan(10);

async Task OccupyTheLockALittleBit(int key)
{
using (await _asyncKeyedLocker.LockAsync(key.ToString()))
{
var incrementedCurrentParallelism = Interlocked.Increment(ref currentParallelism);

lock (parallelismLock)
{
maxParallelism = Math.Max(incrementedCurrentParallelism, maxParallelism);
}

const int delay = 250;

await Task.Delay(TimeSpan.FromMilliseconds(delay)).ConfigureAwait(false);

Interlocked.Decrement(ref currentParallelism);
}
}
}

[Fact]
public async Task ShouldRunThreadsWithSameKeysLinearly()
{
// Arrange
var runningTasksIndex = new ConcurrentDictionary<int, int>();
var parallelismLock = new object();
var currentParallelism = 0;
var maxParallelism = 0;

// 100 threads, 10 keys
var threads = Enumerable.Range(0, 100)
.Select(i => Task.Run(async () => await OccupyTheLockALittleBit(i % 10).ConfigureAwait(false)))
.ToList();

// Act + Assert
await Task.WhenAll(threads).ConfigureAwait(false);

maxParallelism.Should().BeLessOrEqualTo(10);

async Task OccupyTheLockALittleBit(int key)
{
using (await _asyncKeyedLocker.LockAsync(key.ToString()))
{
var incrementedCurrentParallelism = Interlocked.Increment(ref currentParallelism);


lock (parallelismLock)
{
maxParallelism = Math.Max(incrementedCurrentParallelism, maxParallelism);
}

var currentTaskId = Task.CurrentId ?? -1;
if (runningTasksIndex.TryGetValue(key, out var otherThread))
throw new Exception($"Thread #{currentTaskId} acquired a lock using key ${key} " +
$"but another thread #{otherThread} is also still running using this key!");

runningTasksIndex[key] = currentTaskId;

const int delay = 10;

await Task.Delay(TimeSpan.FromMilliseconds(delay)).ConfigureAwait(false);

if (!runningTasksIndex.TryRemove(key, out var value))
{
var ex = new Exception($"Thread #{currentTaskId} has finished " +
"but when trying to cleanup the running threads index, the value is already gone");

throw ex;
}

if (value != currentTaskId)
{
var ex = new Exception($"Thread #{currentTaskId} has finished and has removed itself from the running threads index," +
$" but that index contained an incorrect value: #{value}!");

throw ex;
}

Interlocked.Decrement(ref currentParallelism);
}
}
}
}

public class Sync : TestsForAsyncKeyedLock
{
[Fact]
public void ShouldRunThreadsWithDistinctKeysInParallel()
{
// Arrange
var currentParallelism = 0;
var maxParallelism = 0;
var parallelismLock = new object();

// 100 threads, 100 keys
var threads = Enumerable.Range(0, 100)
.Select(i => new Thread(() => OccupyTheLockALittleBit(i)))
.ToList();

// Act
foreach (var thread in threads) thread.Start();

foreach (var thread in threads) thread.Join();

maxParallelism.Should().BeGreaterThan(10);

void OccupyTheLockALittleBit(int key)
{
using (_asyncKeyedLocker.Lock(key.ToString()))
{
var incrementedCurrentParallelism = Interlocked.Increment(ref currentParallelism);

lock (parallelismLock)
{
maxParallelism = Math.Max(incrementedCurrentParallelism, maxParallelism);
}

const int delay = 250;

Thread.Sleep(TimeSpan.FromMilliseconds(delay));

Interlocked.Decrement(ref currentParallelism);
}
}
}

[Fact]
public void ShouldRunThreadsWithSameKeysLinearly()
{
// Arrange
var runningThreadsIndex = new ConcurrentDictionary<int, int>();
var parallelismLock = new object();
var currentParallelism = 0;
var maxParallelism = 0;

// 100 threads, 10 keys
var threads = Enumerable.Range(0, 100)
.Select(i => new Thread(() => OccupyTheLockALittleBit(i % 10)))
.ToList();

// Act
foreach (var thread in threads) thread.Start();

foreach (var thread in threads) thread.Join();

// Assert
maxParallelism.Should().BeLessOrEqualTo(10);

void OccupyTheLockALittleBit(int key)
{
using (_asyncKeyedLocker.Lock(key.ToString()))
{
var incrementedCurrentParallelism = Interlocked.Increment(ref currentParallelism);

lock (parallelismLock)
{
maxParallelism = Math.Max(incrementedCurrentParallelism, maxParallelism);
}

var currentThreadId = Thread.CurrentThread.ManagedThreadId;
if (runningThreadsIndex.TryGetValue(key, out var otherThread))
throw new Exception($"Thread #{currentThreadId} acquired a lock using key ${key} " +
$"but another thread #{otherThread} is also still running using this key!");

runningThreadsIndex[key] = currentThreadId;

const int delay = 10;

Thread.Sleep(TimeSpan.FromMilliseconds(delay));

if (!runningThreadsIndex.TryRemove(key, out var value))
{
var ex = new Exception($"Thread #{currentThreadId} has finished " +
"but when trying to cleanup the running threads index, the value is already gone");

throw ex;
}

if (value != currentThreadId)
{
var ex = new Exception($"Thread #{currentThreadId} has finished and has removed itself from the running threads index," +
$" but that index contained an incorrect value: #{value}!");

throw ex;
}

Interlocked.Decrement(ref currentParallelism);
}
}
}
}
}
Loading

0 comments on commit f89c81d

Please sign in to comment.