Skip to content

Commit

Permalink
Merge pull request #38 from santisq/37-invoke-parallel-not-correctly-…
Browse files Browse the repository at this point in the history
…disposing-runspaces-take-2

Fixes `Invoke-Parallel` not disposing Runspaces, take 2
  • Loading branch information
santisq authored Jul 3, 2024
2 parents 50270fb + 13ca3e5 commit 04ce513
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 89 deletions.
2 changes: 1 addition & 1 deletion module/PSParallelPipeline.psd1
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
RootModule = 'bin/netstandard2.0/PSParallelPipeline.dll'

# Version number of this module.
ModuleVersion = '1.1.8'
ModuleVersion = '1.1.9'

# Supported PSEditions
# CompatiblePSEditions = @()
Expand Down
8 changes: 5 additions & 3 deletions src/PSParallelPipeline/Commands/InvokeParallelCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,12 @@ protected override void ProcessRecord()
}
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
{
_worker.Stop();
_worker.StopAndWait();
throw;
}
catch (OperationCanceledException exception)
{
_worker.WaitOperationCanceled();
exception.WriteTimeoutError(this);
}
catch (Exception exception)
Expand All @@ -119,11 +120,12 @@ protected override void EndProcessing()
}
catch (Exception _) when (_ is PipelineStoppedException or FlowControlException)
{
_worker.Stop();
_worker.StopAndWait();
throw;
}
catch (OperationCanceledException exception)
{
_worker.WaitOperationCanceled();
exception.WriteTimeoutError(this);
}
catch (Exception exception)
Expand Down Expand Up @@ -166,7 +168,7 @@ private void ProcessOutput(PSOutputData data)
}
}

protected override void StopProcessing() => _worker?.Stop();
protected override void StopProcessing() => _worker?.StopAndWait();

public void Dispose()
{
Expand Down
4 changes: 2 additions & 2 deletions src/PSParallelPipeline/PSOutputStreams.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ internal sealed class PSOutputStreams : IDisposable
internal PSOutputStreams(Worker worker)
{
_worker = worker;
SetStreams(this);
SetStreamHandlers(this);
}

internal void AddOutput(PSOutputData data) => OutputPipe.Add(data, Token);

private static void SetStreams(PSOutputStreams outputStreams)
private static void SetStreamHandlers(PSOutputStreams outputStreams)
{
outputStreams.Success.DataAdded += (s, e) =>
{
Expand Down
20 changes: 8 additions & 12 deletions src/PSParallelPipeline/PSTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,18 @@ internal sealed class PSTask : IDisposable
{
private PSOutputStreams OutputStreams { get => _pool.PSOutputStreams; }

internal Runspace Runspace
{
get => _powershell.Runspace;
set => _powershell.Runspace = value;
}

private readonly PowerShell _powershell;

private readonly PSDataStreams _internalStreams;

private readonly RunspacePool _pool;

internal Runspace Runspace
{
get => _powershell.Runspace;
set => _powershell.Runspace = value;
}

private PSTask(RunspacePool runspacePool)
{
_powershell = PowerShell.Create();
Expand Down Expand Up @@ -92,12 +92,8 @@ internal async Task<PSTask> InvokeAsync()

private static Action CancelCallback(PSTask task) => delegate
{
task._powershell.BeginStop((e) =>
{
task._powershell.EndStop(e);
task.Runspace.Dispose();
task.Dispose();
}, null);
task.Dispose();
task.Runspace.Dispose();
};

public void Dispose()
Expand Down
52 changes: 18 additions & 34 deletions src/PSParallelPipeline/RunspacePool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,27 @@ internal sealed class RunspacePool : IDisposable

private InitialSessionState InitialSessionState { get => _settings.InitialSessionState; }

private int MaxRunspaces { get => _settings.MaxRunspaces; }

private bool UseNewRunspace { get => _settings.UseNewRunspace; }

private Dictionary<string, object?> UsingStatements { get => _settings.UsingStatements; }

private int _totalMade;

private readonly Queue<Runspace> _pool;

private readonly List<Task<PSTask>> _tasks;

private readonly PoolSettings _settings;

private readonly Worker _worker;

private readonly List<Runspace> _createdRunspaces;

private readonly TaskManager _manager;

internal RunspacePool(PoolSettings settings, Worker worker)
{
_settings = settings;
_worker = worker;
_pool = new Queue<Runspace>(MaxRunspaces);
_tasks = new List<Task<PSTask>>(MaxRunspaces);
_createdRunspaces = new List<Runspace>(MaxRunspaces);
_pool = new Queue<Runspace>(_settings.MaxRunspaces);
_createdRunspaces = new List<Runspace>(_settings.MaxRunspaces);
_manager = new(_settings.MaxRunspaces);
}

private Runspace CreateRunspace()
Expand All @@ -56,68 +52,59 @@ private async Task<Runspace> GetRunspaceAsync()
return _pool.Dequeue();
}

if (_totalMade == MaxRunspaces)
if (_manager.ShouldProcess)
{
await ProcessTaskAsync();
Token.ThrowIfCancellationRequested();
return _pool.Dequeue();
}

_totalMade++;
return CreateRunspace();
}

internal async Task ProcessTasksAsync()
{
while (_tasks.Count > 0)
while (_manager.HasMoreTasks)
{
await ProcessTaskAsync();
Token.ThrowIfCancellationRequested();
}
}

internal async Task EnqueueAsync(PSTask task)
internal async Task EnqueueAsync(PSTask psTask)
{
if (UsingStatements is { Count: > 0 })
{
task.AddUsingStatements(UsingStatements);
psTask.AddUsingStatements(UsingStatements);
}

task.Runspace = await GetRunspaceAsync();
_tasks.Add(task.InvokeAsync());
psTask.Runspace = await GetRunspaceAsync();
_manager.Enqueue(psTask);
}

private async Task ProcessTaskAsync()
{
PSTask? ps = null;
PSTask? pSTask = null;

try
{
Token.ThrowIfCancellationRequested();
Task<PSTask> awaiter = await Task.WhenAny(_tasks);
_tasks.Remove(awaiter);
ps = await awaiter;
Runspace runspace = ps.Runspace;
Task<PSTask> awaiter = await _manager.WhenAny();
Runspace runspace = _manager.Dequeue(awaiter);

if (UseNewRunspace)
{
ps.Runspace.Dispose();
runspace.Dispose();
runspace = CreateRunspace();
}

_pool.Enqueue(runspace);
}
catch (Exception _) when (_ is TaskCanceledException or OperationCanceledException)
{
throw;
pSTask = await awaiter;
}
catch (Exception exception)
{
PSOutputStreams.AddOutput(exception.CreateProcessingTaskError(this));
}
finally
{
ps?.Dispose();
pSTask?.Dispose();
}
}

Expand All @@ -128,10 +115,7 @@ public void Dispose()
{
foreach (Runspace runspace in _createdRunspaces)
{
if (runspace is { RunspaceAvailability: RunspaceAvailability.Available })
{
runspace.Dispose();
}
runspace.Dispose();
}

GC.SuppressFinalize(this);
Expand Down
42 changes: 42 additions & 0 deletions src/PSParallelPipeline/TaskManager.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
using System.Collections.Generic;
using System.Management.Automation.Runspaces;
using System.Threading.Tasks;

namespace PSParallelPipeline;

internal sealed class TaskManager
{
private readonly List<Task<PSTask>> _tasks;

private readonly Dictionary<int, Runspace> _assignedRunspaces;

private readonly int _maxRunspaces;

internal bool ShouldProcess { get => _tasks.Count == _maxRunspaces; }

internal bool HasMoreTasks { get => _tasks.Count > 0; }

internal TaskManager(int maxRunspaces)
{
_maxRunspaces = maxRunspaces;
_tasks = new List<Task<PSTask>>(maxRunspaces);
_assignedRunspaces = new Dictionary<int, Runspace>(maxRunspaces);
}

internal void Enqueue(PSTask psTask)
{
Task<PSTask> task = psTask.InvokeAsync();
_assignedRunspaces[task.Id] = psTask.Runspace;
_tasks.Add(task);
}

internal Task<Task<PSTask>> WhenAny() => Task.WhenAny(_tasks);

internal Runspace Dequeue(Task<PSTask> psTask)
{
Runspace runspace = _assignedRunspaces[psTask.Id];
_assignedRunspaces.Remove(psTask.Id);
_tasks.Remove(psTask);
return runspace;
}
}
17 changes: 12 additions & 5 deletions src/PSParallelPipeline/Worker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,16 @@ internal Worker(PoolSettings settings)

internal void Wait() => _worker?.GetAwaiter().GetResult();

internal void Stop() => _cts.Cancel();
internal void WaitOperationCanceled() =>
_worker?
.ContinueWith(e => { }, TaskContinuationOptions.OnlyOnCanceled)
.Wait();

internal void StopAndWait()
{
_cts.Cancel();
Wait();
}

internal void CancelAfter(TimeSpan span) => _cts.CancelAfter(span);

Expand All @@ -64,12 +73,10 @@ internal void Start() => _worker = Task.Run(async () =>
{
while (!_inputQueue.IsCompleted)
{
if (!_inputQueue.TryTake(out PSTask ps, 0, Token))
if (_inputQueue.TryTake(out PSTask ps, 0, Token))
{
continue;
await _runspacePool.EnqueueAsync(ps);
}

await _runspacePool.EnqueueAsync(ps);
}

await _runspacePool.ProcessTasksAsync();
Expand Down
Loading

0 comments on commit 04ce513

Please sign in to comment.