Skip to content

Commit

Permalink
Merge pull request #7 from makasim/add-mongodb-push-operator-support
Browse files Browse the repository at this point in the history
add support of mongodb push operator.
  • Loading branch information
makasim authored Oct 25, 2017
2 parents 522854b + fbe58aa commit 47a5946
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 12 deletions.
38 changes: 35 additions & 3 deletions src/Converter.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@ class Converter
public static function convertJsonPatchToMongoUpdate(array $diff)
{
$update = ['$set' => [], '$unset' => []];

foreach ($diff as $op) {
if (isset($op['path']) && '/_id' == $op['path']) {
continue;
}

switch ($op['op']) {
case 'add':
if (is_array($op['value'])) {
if (static::isPathArray($op['path'])) {
if (false == isset($update['$push'][self::pathToDotWithoutLastPart($op['path'])]['$each'])) {
$update['$push'][self::pathToDotWithoutLastPart($op['path'])]['$each'] = [];
}

$update['$push'][self::pathToDotWithoutLastPart($op['path'])]['$each'][] = $op['value'];
} else if (is_array($op['value'])) {
foreach ($op['value'] as $key => $value) {
$update['$set'][self::pathToDot($op['path']).'.'.$key] = $value;
$update['$set'][self::pathToDot($op['path']) . '.' . $key] = $value;
}
} else {
$update['$set'][self::pathToDot($op['path'])] = $op['value'];
Expand All @@ -42,6 +49,10 @@ public static function convertJsonPatchToMongoUpdate(array $diff)

}

if (empty($update['$push'])) {
unset($update['$push']);
}

if (empty($update['$set'])) {
unset($update['$set']);
}
Expand All @@ -57,10 +68,31 @@ public static function convertJsonPatchToMongoUpdate(array $diff)
*
* @return string
*/
private static function pathToDot($path)
private static function pathToDot(string $path): string
{
$path = ltrim($path, '/');

return str_replace('/', '.', $path);
}

/**
* @param string $path
*
* @return string
*/
private static function pathToDotWithoutLastPart(string $path): string
{
$parts = explode('/', ltrim($path));

array_pop($parts);

return static::pathToDot(implode('/', $parts));
}

private static function isPathArray(string $path): bool
{
$parts = explode('/', ltrim($path));

return is_numeric(array_pop($parts));
}
}
21 changes: 12 additions & 9 deletions src/PessimisticLock.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
<?php
namespace Makasim\Yadm;

use MongoDB\BSON\ObjectID;
use MongoDB\BSON\UTCDatetime;
use MongoDB\Collection;
use MongoDB\Driver\Exception\BulkWriteException;
Expand All @@ -19,15 +18,16 @@ class PessimisticLock
* @var string
*/
private $sessionId;

/**
* @param Collection $collection
* @param string $sessionId
* @var int
*/
public function __construct(Collection $collection, $sessionId = null)
private $limit;

public function __construct(Collection $collection, string $sessionId = null, int $limit = 300)
{
$this->collection = $collection;
$this->sessionId = $sessionId ?: getmypid().'-'.(microtime(true)*10000);
$this->limit = $limit;

register_shutdown_function(function () { $this->unlockAll(); });
}
Expand All @@ -38,14 +38,16 @@ public function __construct(Collection $collection, $sessionId = null)
* @param string $id
* @param int $limit
*/
public function lock($id, $limit = 300)
public function lock(string $id, int $limit = 300): void
{
$this->createIndexes();

$timeout = time() + $limit; // I think it must be a bit greater then mongos index ttl so there is a way to process data.

while (time() < $timeout) {
try {
$result = $this->collection->insertOne([
'_id' => new ObjectID((string) $id),
'id' => $id,
'timestamp' => new UTCDatetime(time() * 1000),
'sessionId' => $this->sessionId,
]);
Expand Down Expand Up @@ -73,10 +75,10 @@ public function lock($id, $limit = 300)
/**
* @param string $id
*/
public function unlock($id)
public function unlock(string $id): void
{
$result = $this->collection->deleteOne([
'_id' => new ObjectID((string) $id),
'id' => $id,
'sessionId' => $this->sessionId,
]);

Expand All @@ -103,6 +105,7 @@ public function createIndexes()
} catch (RuntimeException $e) {
}

$this->collection->createIndex(['id' => 1], ['unique' => true]);
$this->collection->createIndex(['timestamp' => 1], ['expireAfterSeconds' => 302]);
$this->collection->createIndex(['sessionId' => 1], ['unique' => false]);
}
Expand Down
12 changes: 12 additions & 0 deletions src/Storage.php
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,20 @@ public function update($model, $filter = null, array $options = [])
return;
}

// mongodb's update cannot do a change of existing element and push a new one to a collection.
$pushUpdate = [];
if (array_key_exists('$push', $update)) {
$pushUpdate['$push'] = $update['$push'];

unset($update['$push']);
}

$result = $this->collection->updateOne($filter, $update, $options);

if ($pushUpdate) {
$this->collection->updateOne($filter, $pushUpdate, $options);
}

if ($result->getUpsertedCount()) {
set_object_id($model, new ObjectID((string) $result->getUpsertedId()));
}
Expand Down

0 comments on commit 47a5946

Please sign in to comment.