Skip to content

Commit

Permalink
Update SignalQueue to keep the order
Browse files Browse the repository at this point in the history
  • Loading branch information
roxblnfk committed Mar 10, 2025
1 parent 86d9e8e commit 7745a4b
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 30 deletions.
65 changes: 35 additions & 30 deletions src/Internal/Declaration/WorkflowInstance/SignalQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
final class SignalQueue
{
/**
* @var array<non-empty-string, list<ValuesInterface>>
* @var array<int, SignalQueueItem>
*/
private array $queue = [];

Expand All @@ -48,12 +48,16 @@ final class SignalQueue
public function push(string $signal, ValuesInterface $values): void
{
if (isset($this->consumers[$signal])) {
($this->onSignal)($signal, $this->consumers[$signal], $values);
$this->consume($signal, $values, $this->consumers[$signal]);
return;
}

$this->queue[$signal][] = $values;
$this->flush($signal);
if ($this->fallbackConsumer !== null) {
$this->consumeFallback($signal, $values);
return;
}

$this->queue[] = new SignalQueueItem($signal, $values);
}

/**
Expand All @@ -71,7 +75,13 @@ public function onSignal(\Closure $handler): void
public function attach(string $signal, callable $consumer): void
{
$this->consumers[$signal] = $consumer; // overwrite
$this->flush($signal);

foreach ($this->queue as $k => $item) {
if ($item->name === $signal) {
unset($this->queue[$k]);
$this->consume($signal, $item->values, $consumer);
}
}
}

/**
Expand All @@ -82,10 +92,13 @@ public function setFallback(\Closure $consumer): void
$this->fallbackConsumer = $consumer;

// Flush all signals that have no consumer
foreach (\array_diff_key($this->queue, $this->consumers) as $signal => $list) {
if ($list !== []) {
$this->flush($signal);
foreach ($this->queue as $k => $item) {
if (\array_key_exists($item->name, $this->consumers)) {
continue;
}

unset($this->queue[$k]);
$this->consumeFallback($item->name, $item->values);
}
}

Expand All @@ -96,31 +109,23 @@ public function clear(): void

/**
* @param non-empty-string $signal
*
* @psalm-suppress UnusedVariable
* @param Consumer $consumer
*/
private function flush(string $signal): void
private function consume(string $signal, ValuesInterface $values, callable $consumer): void
{
if (!isset($this->queue[$signal])) {
return;
}

$consumer = $this->consumers[$signal] ?? null;

if ($consumer === null) {
if ($this->fallbackConsumer === null) {
return;
}

// Wrap the fallback consumer to call interceptors
$handler = $this->fallbackConsumer;
$consumer = static fn(ValuesInterface $values): mixed => $handler($signal, $values);
}
($this->onSignal)($signal, $consumer, $values);
}

while ($this->queue[$signal] !== []) {
$args = \array_shift($this->queue[$signal]);
/**
* @param non-empty-string $signal
*/
private function consumeFallback(string $signal, ValuesInterface $values): void
{
$handler = $this->fallbackConsumer;
\assert($handler !== null);

($this->onSignal)($signal, $consumer, $args);
}
// Wrap the fallback consumer to call interceptors
$consumer = static fn(ValuesInterface $values): mixed => $handler($signal, $values);
($this->onSignal)($signal, $consumer, $values);
}
}
23 changes: 23 additions & 0 deletions src/Internal/Declaration/WorkflowInstance/SignalQueueItem.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Internal\Declaration\WorkflowInstance;

use Temporal\DataConverter\ValuesInterface;

final class SignalQueueItem
{
public function __construct(
/** @var non-empty-string */
public readonly string $name,
public readonly ValuesInterface $values,
) {}
}

0 comments on commit 7745a4b

Please sign in to comment.