Skip to content

Commit

Permalink
Check same queue messages for duplicates before run
Browse files Browse the repository at this point in the history
ENG-12746
  • Loading branch information
hadomskyi committed Nov 30, 2023
1 parent 2449f97 commit 950bfb0
Show file tree
Hide file tree
Showing 4 changed files with 359 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/parameters/CraftliltpluginParameters.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use LiltConnectorSDK\Model\SettingsResponse;
use lilthq\craftliltplugin\services\listeners\AfterDraftAppliedListener;
use lilthq\craftliltplugin\services\listeners\AfterErrorListener;
use lilthq\craftliltplugin\services\listeners\QueueBeforePushListener;
use lilthq\craftliltplugin\services\listeners\RegisterCpAlertsListener;
use lilthq\craftliltplugin\services\listeners\RegisterDefaultTableAttributesListener;
use lilthq\craftliltplugin\services\listeners\RegisterElementActionsListener;
Expand Down Expand Up @@ -72,6 +73,7 @@ class CraftliltpluginParameters
AfterErrorListener::class,
RegisterCpAlertsListener::class,
RegisterElementActionsListener::class,
QueueBeforePushListener::class,
];

public const TRANSLATION_WORKFLOW_INSTANT = SettingsResponse::LILT_TRANSLATION_WORKFLOW_INSTANT;
Expand Down
113 changes: 113 additions & 0 deletions src/services/listeners/QueueBeforePushListener.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
<?php

/**
* @link https://github.com/lilt
* @copyright Copyright (c) 2023 Lilt Devs
*/

declare(strict_types=1);

namespace lilthq\craftliltplugin\services\listeners;

use Craft;
use craft\queue\Queue;
use lilthq\craftliltplugin\modules\FetchInstantJobTranslationsFromConnector;
use lilthq\craftliltplugin\modules\FetchJobStatusFromConnector;
use lilthq\craftliltplugin\modules\FetchTranslationFromConnector;
use lilthq\craftliltplugin\modules\FetchVerifiedJobTranslationsFromConnector;
use lilthq\craftliltplugin\modules\SendJobToConnector;
use lilthq\craftliltplugin\modules\SendTranslationToConnector;
use yii\base\Event;
use yii\queue\ExecEvent;
use yii\queue\PushEvent;

class QueueBeforePushListener implements ListenerInterface
{
private const SUPPORTED_JOBS = [
FetchJobStatusFromConnector::class,
FetchInstantJobTranslationsFromConnector::class,
FetchVerifiedJobTranslationsFromConnector::class,
FetchTranslationFromConnector::class,
SendJobToConnector::class,
SendTranslationToConnector::class,
];

public function register(): void
{
Event::on(
Queue::class,
Queue::EVENT_BEFORE_PUSH,
[$this, '__invoke']
);
}

private function isEventEligible(Event $event): bool
{
if (!$event instanceof PushEvent) {
return false;
}

if ($event->job === null) {
return false;
}

$jobClass = get_class($event->job);

return in_array($jobClass, self::SUPPORTED_JOBS);
}

/**
* @var ExecEvent $event
*/
public function __invoke(Event $event): Event
{
if (!$this->isEventEligible($event)) {
return $event;
}

/**
* @var FetchTranslationFromConnector|FetchJobStatusFromConnector|SendJobToConnector $newQueueJob
*/
$newQueueJob = $event->job;

$jobsInfo = Craft::$app->getQueue()->getJobInfo();

// Release all previously queued jobs for lilt plugin jobs
foreach ($jobsInfo as $jobInfo) {
$jobDetails = Craft::$app->getQueue()->getJobDetails((string)$jobInfo['id']);

if (!in_array(get_class($jobDetails['job']), self::SUPPORTED_JOBS)) {
continue;
}

/**
* @var FetchTranslationFromConnector|FetchJobStatusFromConnector|SendJobToConnector $newQueueJob
*/
$existingQueueJob = $jobDetails['job'];

if (property_exists($existingQueueJob, 'translationId')) {
// compare if job exist for this job id and translation id
if (
$newQueueJob->jobId === $existingQueueJob->jobId
&& $newQueueJob->translationId === $existingQueueJob->translationId
) {
// we already have this job in process, skipping push
$event->handled = true;
return $event;
}

continue;
}

// compare if job exist for this job id
if ($newQueueJob->jobId === $existingQueueJob->jobId) {
// we already have this job in process, skipping push
$event->handled = true;
return $event;
}
}


return $event;
}
}
8 changes: 8 additions & 0 deletions tests/_support/Helper/CraftLiltPluginHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
use Codeception\Module;
use Craft;
use craft\queue\BaseJob;
use craft\queue\QueueInterface;
use lilthq\craftliltplugin\Craftliltplugin;
use lilthq\craftliltplugin\elements\Job;
use lilthq\craftliltplugin\records\I18NRecord;
Expand Down Expand Up @@ -372,6 +373,13 @@ public function runQueue(string $queueItem, array $params = []): void
Craft::$app->getQueue()->run();
}

public function clearQueue(): void
{
/** @var QueueInterface $queue */
$queue = Craft::$app->getQueue();
$queue->releaseAll();
}

public function executeQueue(string $queueItem, array $params = []): void
{
/** @var BaseJob $job */
Expand Down
236 changes: 236 additions & 0 deletions tests/integration/modules/QueueBeforePushListenerCest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
<?php

declare(strict_types=1);

use craft\helpers\Queue;
use lilthq\craftliltplugin\modules\SendJobToConnector;
use lilthq\craftliltplugin\modules\SendTranslationToConnector;
use PHPUnit\Framework\Assert;

class QueueBeforePushListenerCest
{
public function testNoSameJobInQueue_SendJobToConnector(IntegrationTester $I): void
{
$I->clearQueue();

$jobClass = 'lilthq\craftliltplugin\modules\SendJobToConnector';

Queue::push(
new $jobClass([
'jobId' => 123,
'attempt' => 345
]),
SendJobToConnector::PRIORITY,
SendJobToConnector::DELAY_IN_SECONDS
);

$jobInfos = Craft::$app->queue->getJobInfo();
$jobs = [];

foreach ($jobInfos as $jobInfo) {
$actual = Craft::$app->queue->getJobDetails($jobInfo['id']);

if (get_class($actual['job']) !== $jobClass) {
continue;
}

/**
* @var SendJobToConnector $actual
*/
$jobs[$actual['job']->jobId] = ['attempt' => $actual['job']->attempt];
}

Assert::assertCount(1, $jobs);
Assert::assertArrayHasKey(123, $jobs);
Assert::assertEquals(['attempt' => 345], $jobs['123']);

$I->assertJobInQueue(
new SendJobToConnector([
'jobId' => 123,
'attempt' => 345
])
);
}

public function testSameJobInQueue_SendJobToConnector(IntegrationTester $I): void
{
$I->clearQueue();

$jobClass = 'lilthq\craftliltplugin\modules\SendJobToConnector';

Queue::push(
new $jobClass([
'jobId' => 123,
'attempt' => 678
]),
SendJobToConnector::PRIORITY,
SendJobToConnector::DELAY_IN_SECONDS
);

Queue::push(
new $jobClass([
'jobId' => 123,
'attempt' => 345
]),
SendJobToConnector::PRIORITY,
SendJobToConnector::DELAY_IN_SECONDS
);

Queue::push(
new $jobClass([
'jobId' => 123,
'attempt' => 91011
]),
SendJobToConnector::PRIORITY,
SendJobToConnector::DELAY_IN_SECONDS
);

$jobInfos = Craft::$app->queue->getJobInfo();
$jobs = [];

foreach ($jobInfos as $jobInfo) {
$actual = Craft::$app->queue->getJobDetails($jobInfo['id']);

if (get_class($actual['job']) !== $jobClass) {
continue;
}

/**
* @var SendJobToConnector $actual
*/
$jobs[$actual['job']->jobId] = ['attempt' => $actual['job']->attempt];
}

Assert::assertCount(1, $jobs);
Assert::assertArrayHasKey(123, $jobs);
Assert::assertEquals(['attempt' => 678], $jobs['123']);

$I->assertJobInQueue(
new SendJobToConnector([
'jobId' => 123,
'attempt' => 678
])
);
}

public function testNoSameJobInQueue_SendTranslationToConnector(IntegrationTester $I): void
{
$I->clearQueue();

$jobClass = 'lilthq\craftliltplugin\modules\SendTranslationToConnector';

Queue::push(
new $jobClass([
'jobId' => 123,
'translationId' => 112233,
'attempt' => 345,
]),
SendTranslationToConnector::PRIORITY,
SendTranslationToConnector::DELAY_IN_SECONDS
);

$jobInfos = Craft::$app->queue->getJobInfo();
$jobs = [];

foreach ($jobInfos as $jobInfo) {
$actual = Craft::$app->queue->getJobDetails($jobInfo['id']);

if (get_class($actual['job']) !== $jobClass) {
continue;
}

/**
* @var SendTranslationToConnector $actualJob
*/
$actualJob = $actual['job'];

$jobs[$actual['job']->jobId] = [
'attempt' => $actualJob->attempt,
'translationId' => $actualJob->translationId
];
}

Assert::assertCount(1, $jobs);
Assert::assertArrayHasKey(123, $jobs);
Assert::assertEquals(['attempt' => 345, 'translationId' => 112233], $jobs['123']);

$I->assertJobInQueue(
new SendTranslationToConnector([
'jobId' => 123,
'attempt' => 345,
'translationId' => 112233
])
);
}

public function testSameJobInQueue_SendTranslationToConnector(IntegrationTester $I): void
{
$I->clearQueue();

$jobClass = 'lilthq\craftliltplugin\modules\SendTranslationToConnector';

Queue::push(
new $jobClass([
'jobId' => 123,
'attempt' => 678,
'translationId' => 112233
]),
SendTranslationToConnector::PRIORITY,
SendTranslationToConnector::DELAY_IN_SECONDS
);

Queue::push(
new $jobClass([
'jobId' => 123,
'attempt' => 345,
'translationId' => 112233
]),
SendTranslationToConnector::PRIORITY,
SendTranslationToConnector::DELAY_IN_SECONDS
);

Queue::push(
new $jobClass([
'jobId' => 123,
'attempt' => 91011,
'translationId' => 112233
]),
SendTranslationToConnector::PRIORITY,
SendTranslationToConnector::DELAY_IN_SECONDS
);

$jobInfos = Craft::$app->queue->getJobInfo();
$jobs = [];

foreach ($jobInfos as $jobInfo) {
$actual = Craft::$app->queue->getJobDetails($jobInfo['id']);

if (get_class($actual['job']) !== $jobClass) {
continue;
}

/**
* @var SendTranslationToConnector $actual
*/
$jobs[$actual['job']->jobId] = [
'attempt' => $actual['job']->attempt,
'translationId' => 112233,
];
}

Assert::assertCount(1, $jobs);
Assert::assertArrayHasKey(123, $jobs);
Assert::assertEquals([
'attempt' => 678,
'translationId' => 112233
], $jobs['123']);

$I->assertJobInQueue(
new SendTranslationToConnector([
'jobId' => 123,
'attempt' => 678,
'translationId' => 112233
])
);
}
}

0 comments on commit 950bfb0

Please sign in to comment.