Parallel worker pool uses the PHP parallel extension to provide a simple interface for dealing with parallelization of tasks.
The WorkerPool
requires an implementation of the WorkFactoryInterface
which is responsible for creating the consumer and producer closures.
A producer closure must return a Generator.
composer require hdvianna/parallel-workerpool
docker-compose up
Docker compose builds an environment with the needed extensions installed and create a bind mount to the current directory.
In this example 10 workers will sleep for n milliseconds, each time they consume the work generated by the WorkFactory.
use hdvianna\Concurrent\WorkFactoryInterface;
use hdvianna\Concurrent\WorkerPool;
(new WorkerPool(new class implements WorkFactoryInterface {
public function createWorkGeneratorClosure(): \Closure
{
return function () {
for ($i = 0; $i < 100; $i++) {
$work = new \stdClass();
$work->time = mt_rand(300, 1000);
$work->id = $i;
yield $work;
}
};
}
public function createWorkConsumerClosure(): \Closure
{
return function($work) {
printf("[$work->id]: Sleeping for %d milliseconds ...%s", $work->time, PHP_EOL);
usleep($work->time * 1000);
printf("[$work->id]: Woke up after %d milliseconds ...%s", $work->time, PHP_EOL);
};
}
}, 10))->run();
Data can be synchronized by using lock and unlock closures sent to the worker functions.
The shared data are received from the $lock
closure and sent to the $unlock
closure.
The last value sent can be get invoking the WorkerPool::lastValue()
use hdvianna\Concurrent\WorkFactoryInterface;
use hdvianna\Concurrent\WorkerPool;
$sharedData = 700;
$works = 1000;
$pool = new WorkerPool((new class ($sharedData, $works) implements WorkFactoryInterface {
/**
* @var int
*/
private $sharedData;
/**
* @var int
*/
private $works;
/***
* constructor.
* @param int $sharedData
* @param int $works
*/
public function __construct($sharedData, $works)
{
$this->works = $works;
$this->sharedData = $sharedData;
}
public function createWorkGeneratorClosure(): \Closure
{
$workers = $this->works;
return function () use ($workers) {
for ($i = 0; $i < $workers; $i++) {
$work = new \stdClass();
$work->value = 1;
yield $work;
}
};
}
public function createWorkConsumerClosure(): \Closure
{
$initialValue = $this->sharedData;
//Use the $lock and $unlock closures to synchronize data
return function ($work, $lock, $unlock) use ($initialValue) {
/*Synchronize the data. Will block and wait for data.
$lock will return the last value*/
$shared = $lock();
if (!isset($shared)) {
//Data was not initialized
$shared = $initialValue;
}
$shared += $work->value;
//Unlocks sending the new data.
$unlock($shared);
};
}
}), 10);
$pool->run();
//Get the last value sent to the unlock closure
$result = $pool->lastValue();
echo("\$result equals to \$works + \$sharedData?" . PHP_EOL);
echo("($result equals to $works + $sharedData?)" . PHP_EOL);
echo(assert($result === ($works + $sharedData)) ? "Yes!": "No =(").PHP_EOL;