Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix leak #9

Merged
merged 4 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions src/NabtoClient/NabtoClient/Impl/ConnectionEventsListenerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,24 @@
namespace Nabto.Edge.Client.Impl;


internal class ConnectionEventHolder
{
internal ConnectionEventHolderImpl Impl;
internal GCHandle GcHandle;
public ConnectionEventHolder() {
Impl = new ConnectionEventHolderImpl();
GcHandle = GCHandle.Alloc(Impl, GCHandleType.Pinned);
}
~ConnectionEventHolder() {
GcHandle.Free();
}

}

internal class ConnectionEventHolderImpl {
internal int ConnectionEvent;
}

internal class ConnectionEventsListenerImpl : IDisposable, IAsyncDisposable
{
private System.WeakReference<ConnectionImpl> _connection;
Expand Down Expand Up @@ -65,15 +83,14 @@ public async Task startListenEvents()
// Allocate the connectionEvent on the heap such that we can pin it such that the garbage collector is not moving around with the underlying address of the event.
var connectionEventHolder = new ConnectionEventHolder();

GCHandle handle = GCHandle.Alloc(connectionEventHolder, GCHandleType.Pinned);
while (true)
{
AssertListenerIsAlive();
NabtoClientNative.nabto_client_listener_connection_event(_listener.GetHandle(), _connectionEventsFuture.GetHandle(), out connectionEventHolder.ConnectionEvent);
NabtoClientNative.nabto_client_listener_connection_event(_listener.GetHandle(), _connectionEventsFuture.GetHandle(), out connectionEventHolder.Impl.ConnectionEvent);
var ec = await _connectionEventsFuture.WaitAsync();
if (ec == 0)
{
var connectionEvent = connectionEventHolder.ConnectionEvent;
var connectionEvent = connectionEventHolder.Impl.ConnectionEvent;
ConnectionImpl? connection;
if (!_connection.TryGetTarget(out connection))
{
Expand All @@ -100,7 +117,6 @@ public async Task startListenEvents()
{
_listener.Dispose();
_connectionEventsFuture.Dispose();
handle.Free();
return;
}
}
Expand Down
9 changes: 0 additions & 9 deletions src/NabtoClient/NabtoClient/Impl/ConnectionImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,3 @@ protected void Dispose(bool disposing)
}

}

/**
* <summary>This is used to hold a connection event such that we can pin the object, in turn such
* that garbage collection does not change the address of the ConnectionEvent.</summary>
*/
internal class ConnectionEventHolder
{
internal int ConnectionEvent;
}
22 changes: 17 additions & 5 deletions src/NabtoClient/NabtoClient/Impl/MdnsScannerImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,18 @@

namespace Nabto.Edge.Client.Impl;

internal class PinnedMdnsResultPtr {
internal IntPtr Impl;
internal GCHandle GcHandle;
internal PinnedMdnsResultPtr() {
Impl = new IntPtr();
GcHandle = GCHandle.Alloc(Impl, GCHandleType.Pinned);
}
~PinnedMdnsResultPtr() {
GcHandle.Free();
}
}

internal class MdnsResultImpl : Nabto.Edge.Client.MdnsResult
{
public static MdnsResultImpl Create(IntPtr result)
Expand Down Expand Up @@ -92,18 +104,18 @@ public void Stop()
private async void StartListen()
{
// make sure the underlying pointer of the mdnsResult stay the same.
IntPtr mdnsResult = new IntPtr();
GCHandle handle = GCHandle.Alloc(mdnsResult, GCHandleType.Pinned);
PinnedMdnsResultPtr mdnsResult = new PinnedMdnsResultPtr();

while (true)
{
NabtoClientNative.nabto_client_listener_new_mdns_result(_listener.GetHandle(), _future.GetHandle(), out mdnsResult);
NabtoClientNative.nabto_client_listener_new_mdns_result(_listener.GetHandle(), _future.GetHandle(), out mdnsResult.Impl);
var ec = await _future.WaitAsync();

if (ec == 0)
{
MdnsResultImpl r = MdnsResultImpl.Create(mdnsResult);
MdnsResultImpl r = MdnsResultImpl.Create(mdnsResult.Impl);
Handlers?.Invoke(r);
NabtoClientNative.nabto_client_mdns_result_free(mdnsResult);
NabtoClientNative.nabto_client_mdns_result_free(mdnsResult.Impl);
}
else if (ec == NabtoClientError.STOPPED)
{
Expand Down
58 changes: 42 additions & 16 deletions src/NabtoClient/NabtoClient/Impl/StreamImpl.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,59 @@
namespace Nabto.Edge.Client.Impl;

internal class ReadOperation
{
// Wrapper class such that the underlying data object can be pinned.
internal ReadOperationImpl Impl { get; }
internal GCHandle GcHandle;
internal ReadOperation(int bufferSize)
{
Impl = new ReadOperationImpl(bufferSize);
GcHandle = GCHandle.Alloc(Impl, GCHandleType.Pinned);
}
~ReadOperation() {
GcHandle.Free();
}
}

internal class ReadOperationImpl
{

internal IntPtr Buffer { get; }
internal UIntPtr ReadLength;
internal UIntPtr BufferLength;

internal ReadOperation(int bufferSize)
internal ReadOperationImpl(int bufferSize)
{
Buffer = Marshal.AllocHGlobal(bufferSize);
BufferLength = (UIntPtr)bufferSize;
}

~ReadOperation()
~ReadOperationImpl()
{
Marshal.FreeHGlobal(Buffer);
}
}

internal class WriteOperation
internal class WriteOperation {
// Wrapper class such that the underlying data object can be pinned.
internal WriteOperationImpl Impl { get; }
internal GCHandle GcHandle;
internal WriteOperation(byte[] data)
{
Impl = new WriteOperationImpl(data);
GcHandle = GCHandle.Alloc(Impl, GCHandleType.Pinned);
}
~WriteOperation() {
GcHandle.Free();
}
}

internal class WriteOperationImpl
{

internal IntPtr Buffer { get; }
internal UIntPtr BufferLength { get; }
internal WriteOperation(byte[] data)
internal WriteOperationImpl(byte[] data)
{
BufferLength = (UIntPtr)data.Length;
Buffer = Marshal.AllocHGlobal(data.Length);
Expand Down Expand Up @@ -108,19 +137,18 @@ public async Task<byte[]> ReadSomeAsync(int max)
var future = FutureImpl.Create(_client);

var op = new ReadOperation(max);
var gcHandle = GCHandle.Alloc(op, GCHandleType.Pinned);

NabtoClientNative.nabto_client_stream_read_some(GetHandle(), future.GetHandle(), op.Buffer, op.BufferLength, out op.ReadLength);
NabtoClientNative.nabto_client_stream_read_some(GetHandle(), future.GetHandle(), op.Impl.Buffer, op.Impl.BufferLength, out op.Impl.ReadLength);

var ec = await future.WaitAsync();

if (ec == NabtoClientNative.NABTO_CLIENT_EC_OK_value())
{
var data = new byte[op.ReadLength];
var data = new byte[op.Impl.ReadLength];
unsafe
{
byte* b = (byte*)op.Buffer;
for (uint i = 0; i < (uint)op.ReadLength; i++)
byte* b = (byte*)op.Impl.Buffer;
for (uint i = 0; i < (uint)op.Impl.ReadLength; i++)
{
data[i] = b[i];
}
Expand All @@ -142,18 +170,17 @@ public async Task<byte[]> ReadAllAsync(int bytes)
var future = FutureImpl.Create(_client);

var op = new ReadOperation(bytes);
var gcHandle = GCHandle.Alloc(op, GCHandleType.Pinned);

NabtoClientNative.nabto_client_stream_read_all(GetHandle(), future.GetHandle(), op.Buffer, op.BufferLength, out op.ReadLength);
NabtoClientNative.nabto_client_stream_read_all(GetHandle(), future.GetHandle(), op.Impl.Buffer, op.Impl.BufferLength, out op.Impl.ReadLength);

var ec = await future.WaitAsync();
if (ec == NabtoClientNative.NABTO_CLIENT_EC_OK_value())
{
var data = new byte[op.ReadLength];
var data = new byte[op.Impl.ReadLength];
unsafe
{
byte* b = (byte*)op.Buffer;
for (uint i = 0; i < (uint)op.ReadLength; i++)
byte* b = (byte*)op.Impl.Buffer;
for (uint i = 0; i < (uint)op.Impl.ReadLength; i++)
{
data[i] = b[i];
}
Expand All @@ -175,9 +202,8 @@ public async Task WriteAsync(byte[] data)
var future = FutureImpl.Create(_client);

var op = new WriteOperation(data);
var gcHandle = GCHandle.Alloc(op, GCHandleType.Pinned);

NabtoClientNative.nabto_client_stream_write(GetHandle(), future.GetHandle(), op.Buffer, op.BufferLength);
NabtoClientNative.nabto_client_stream_write(GetHandle(), future.GetHandle(), op.Impl.Buffer, op.Impl.BufferLength);

var ec = await future.WaitAsync();

Expand Down
3 changes: 3 additions & 0 deletions test-samples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The "test-samples" are samples used for testing something specific, they are not
meant as simple examples for learning how to use the library, but can be used as
inspiration.
71 changes: 71 additions & 0 deletions test-samples/StreamReader/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
using Nabto.Edge.Client;
using Microsoft.Extensions.Logging;
using System.CommandLine;

class Program
{
private static readonly AutoResetEvent _closing = new AutoResetEvent(false);


protected static void ctrlCHandler(object? sender, ConsoleCancelEventArgs args)
{
_closing.Set();
}

public static async Task Main(string[] args)
{

var client = INabtoClient.Create();
using var loggerFactory = LoggerFactory.Create(builder => builder.AddConsole());
var logger = loggerFactory.CreateLogger<INabtoClient>();
client.SetLogger(logger);

var productIdOption = new Option<string?>(
name: "-p",
description: "The product id to use."
)
{ IsRequired = true };

var deviceIdOption = new Option<string?>(
name: "-d",
description: "The device id to use.")
{ IsRequired = true };

var rootCommand = new RootCommand("Stream Reader App");
rootCommand.AddOption(productIdOption);
rootCommand.AddOption(deviceIdOption);

rootCommand.SetHandler(async (productIdValue, deviceIdValue) =>
{

var connection = client.CreateConnection();
var privateKey = client.CreatePrivateKey();
connection.SetOptions(new ConnectionOptions { PrivateKey = privateKey, ProductId = productIdValue, DeviceId = deviceIdValue, ServerConnectToken = "demosct" });

await connection.ConnectAsync();

var stream = connection.CreateStream();
await stream.OpenAsync(1234);
var readTotal = 0;
var readIncremental = 0;
while (true) {
var data = await stream.ReadSomeAsync(1024);
readTotal += data.Length;
readIncremental += data.Length;

if (readIncremental > 1024 * 1024)
{
Console.WriteLine("Total Read: " + readTotal);
readIncremental = 0;
}
}


Console.WriteLine("Press CTRL-C to exit.");
Console.CancelKeyPress += new ConsoleCancelEventHandler(ctrlCHandler);
_closing.WaitOne();
}, productIdOption, deviceIdOption);
await rootCommand.InvokeAsync(args);
}

};
17 changes: 17 additions & 0 deletions test-samples/StreamReader/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
This test sample receives a lot of streaming data.

Start the streaming device as:
```
cd testDevice
npm install
PRODUCT_ID=... DEVICE_ID=... PRIVATE_KEY=... npx ts-node testDevice.ts
```

Create a private key with the edge key tool and upload the fingerprint.

Start run the dotnet sample as:
```
dotnet run -- -p ... -d ...
```

The sample will receive a lot of streaming data until it is closed.
19 changes: 19 additions & 0 deletions test-samples/StreamReader/StreamReader.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<ItemGroup>
<ProjectReference Include="..\..\src\NabtoClient\NabtoClient.csproj" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="7.0.0" />
<PackageReference Include="System.CommandLine" Version="2.0.0-beta4.22272.1" />
</ItemGroup>

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net7.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

</Project>
1 change: 1 addition & 0 deletions test-samples/StreamReader/testDevice/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
node_modules
Loading
Loading