Skip to content

Commit

Permalink
Added Paranet Sync operation statuses, refactored paranet sync
Browse files Browse the repository at this point in the history
  • Loading branch information
u-hubar committed Oct 11, 2024
1 parent 27bab37 commit d0a4272
Show file tree
Hide file tree
Showing 7 changed files with 172 additions and 91 deletions.
220 changes: 138 additions & 82 deletions src/commands/paranet/paranet-sync-command.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
/* eslint-disable no-unused-vars */
/* eslint-disable no-await-in-loop */
import { setTimeout } from 'timers/promises';
import Command from '../command.js';
Expand All @@ -8,7 +7,6 @@ import {
OPERATION_ID_STATUS,
CONTENT_ASSET_HASH_FUNCTION_ID,
SIMPLE_ASSET_SYNC_PARAMETERS,
TRIPLE_STORE_REPOSITORIES,
PARANET_SYNC_KA_COUNT,
PARANET_SYNC_RETRIES_LIMIT,
PARANET_SYNC_RETRY_DELAY_MS,
Expand Down Expand Up @@ -78,17 +76,35 @@ class ParanetSyncCommand extends Command {
`Paranet sync: Attempting to sync ${cachedMissedKaCount} missed assets for paranet: ${paranetUAL} (${paranetId}), operation ID: ${operationId}!`,
);

await this.syncMissedKAs(
this.operationIdService.updateOperationIdStatus(
operationId,
blockchain,
OPERATION_ID_STATUS.PARANET.PARANET_SYNC_MISSED_KAS_SYNC_START,
);

const [successulMissedSyncsCount, failedMissedSyncsCount] = await this.syncMissedKAs(
paranetUAL,
paranetId,
blockchain,
contract,
tokenId,
paranetMetadata,
paranetNodesAccessPolicy,
operationId,
cachedKaCount,
);

this.logger.info(
`Paranet sync: Successful missed assets syncs: ${successulMissedSyncsCount}; ` +
`Failed missed assets syncs: ${failedMissedSyncsCount} for paranet: ${paranetUAL} ` +
`(${paranetId}), operation ID: ${operationId}!`,
);

this.operationIdService.updateOperationIdStatusWithValues(
operationId,
blockchain,
OPERATION_ID_STATUS.PARANET.PARANET_SYNC_MISSED_KAS_SYNC_END,
successulMissedSyncsCount,
failedMissedSyncsCount,
);
}

// Then, check for new KAs on the blockchain
Expand All @@ -99,7 +115,13 @@ class ParanetSyncCommand extends Command {
} new assets for paranet: ${paranetUAL} (${paranetId}), operation ID: ${operationId}`,
);

await this.syncNewKAs(
this.operationIdService.updateOperationIdStatus(
operationId,
blockchain,
OPERATION_ID_STATUS.PARANET.PARANET_SYNC_NEW_KAS_SYNC_START,
);

const [successulNewSyncsCount, failedNewSyncsCount] = await this.syncNewKAs(
cachedKaCount + cachedMissedKaCount,
contractKaCount,
paranetUAL,
Expand All @@ -112,6 +134,20 @@ class ParanetSyncCommand extends Command {
operationId,
cachedKaCount,
);

this.logger.info(
`Paranet sync: Successful new assets syncs: ${successulNewSyncsCount}; ` +
`Failed new assets syncs: ${failedNewSyncsCount} for paranet: ${paranetUAL} ` +
`(${paranetId}), operation ID: ${operationId}!`,
);

this.operationIdService.updateOperationIdStatusWithValues(
operationId,
blockchain,
OPERATION_ID_STATUS.PARANET.PARANET_SYNC_NEW_KAS_SYNC_END,
successulNewSyncsCount,
failedNewSyncsCount,
);
} else {
this.logger.info(
`Paranet sync: No new assets to sync for paranet: ${paranetUAL} (${paranetId}), operation ID: ${operationId}!`,
Expand All @@ -122,51 +158,35 @@ class ParanetSyncCommand extends Command {
}

async syncAssetState(
operationId,
ual,
blockchain,
contract,
tokenId,
assertionIds,
stateIndex,
paranetId,
paranetTokenId,
paranetRepository,
latestAsset,
deleteFromEarlier,
paranetUAL,
knowledgeAssetId,
paranetNodesAccessPolicy,
paranetMetadata,
) {
const ual = this.ualService.deriveUAL(blockchain, contract, tokenId);
try {
const statePresentInParanetRepository =
await this.tripleStoreService.paranetAssetExists(
blockchain,
contract,
tokenId,
contract,
paranetTokenId,
);

if (statePresentInParanetRepository) {
this.logger.trace(
`Paranet sync: StateIndex: ${stateIndex} for tokenId: ${tokenId} found in triple store blockchain: ${blockchain}`,
);
return true;
}

this.logger.debug(
`Paranet sync: Fetching state index: ${stateIndex + 1} of ${
assertionIds.length
} for asset with ual: ${ual}. blockchain: ${blockchain}`,
);
const assertionId = assertionIds[stateIndex];
const assertionId = assertionIds[stateIndex];

const operationId = await this.operationIdService.generateOperationId(
OPERATION_ID_STATUS.GET.GET_START,
);
this.logger.debug(
`Paranet sync: Fetching state: ${assertionId} index: ${stateIndex + 1} of ${
assertionIds.length
} for asset with ual: ${ual}.`,
);

try {
await Promise.all([
this.operationIdService.updateOperationIdStatus(
operationId,
blockchain,
OPERATION_ID_STATUS.GET.GET_START,
),
this.operationIdService.updateOperationIdStatus(
operationId,
blockchain,
Expand All @@ -180,7 +200,7 @@ class ParanetSyncCommand extends Command {
]);

this.logger.debug(
`Paranet sync: Get for ${ual} with operation id ${operationId} initiated. blockchain: ${blockchain}`,
`Paranet sync: Get for ${ual} with operation id ${operationId} initiated.`,
);

if (paranetNodesAccessPolicy === 'OPEN') {
Expand Down Expand Up @@ -262,7 +282,6 @@ class ParanetSyncCommand extends Command {
blockchainId: blockchain,
ual,
paranetUal: paranetUAL,
knowledgeAssetId,
});
return false;
}
Expand All @@ -274,7 +293,6 @@ class ParanetSyncCommand extends Command {
blockchainId: blockchain,
ual,
paranetUal: paranetUAL,
knowledgeAssetId,
});

return false;
Expand All @@ -284,7 +302,7 @@ class ParanetSyncCommand extends Command {
}

async syncAsset(
knowledgeAssetId,
ual,
blockchain,
contract,
tokenId,
Expand All @@ -295,43 +313,33 @@ class ParanetSyncCommand extends Command {
operationId,
removeMissingAssetRecord = false,
) {
const ual = this.ualService.deriveUAL(blockchain, contract, tokenId);

try {
this.logger.info(
`Paranet sync: Syncing asset ID: ${knowledgeAssetId} for paranet: ${paranetId}, operation ID: ${operationId}`,
`Paranet sync: Syncing asset: ${ual} for paranet: ${paranetId}, operation ID: ${operationId}`,
);

const { knowledgeAssetStorageContract, tokenId: kaTokenId } =
await this.blockchainModuleManager.getParanetKnowledgeAssetLocator(
blockchain,
knowledgeAssetId,
);

const assertionIds = await this.blockchainModuleManager.getAssertionIds(
blockchain,
knowledgeAssetStorageContract,
kaTokenId,
contract,
tokenId,
);

let isSuccessful = true;
for (let stateIndex = 0; stateIndex < assertionIds.length; stateIndex += 1) {
isSuccessful =
isSuccessful &&
(await this.syncAssetState(
operationId,
ual,
blockchain,
knowledgeAssetStorageContract,
kaTokenId,
contract,
tokenId,
assertionIds,
stateIndex,
paranetId,
tokenId,
stateIndex === assertionIds.length - 1
? TRIPLE_STORE_REPOSITORIES.PUBLIC_CURRENT
: TRIPLE_STORE_REPOSITORIES.PUBLIC_HISTORY,
stateIndex === assertionIds.length - 1,
paranetUAL,
knowledgeAssetId,
paranetNodesAccessPolicy,
paranetMetadata,
));
Expand All @@ -344,13 +352,12 @@ class ParanetSyncCommand extends Command {
return isSuccessful;
} catch (error) {
this.logger.warn(
`Paranet sync: Failed to sync asset ID: ${knowledgeAssetId} for paranet: ${paranetId}, error: ${error}`,
`Paranet sync: Failed to sync asset: ${ual} for paranet: ${paranetId}, error: ${error}`,
);
await this.repositoryModuleManager.createMissedParanetAssetRecord({
blockchain,
ual,
paranetUAL,
knowledgeAssetId,
});
return false;
}
Expand All @@ -360,8 +367,6 @@ class ParanetSyncCommand extends Command {
paranetUAL,
paranetId,
blockchain,
contract,
tokenId,
paranetMetadata,
paranetNodesAccessPolicy,
operationId,
Expand All @@ -375,21 +380,26 @@ class ParanetSyncCommand extends Command {
PARANET_SYNC_KA_COUNT,
);

const promises = missedParanetAssets.map((missedParanetAsset) =>
this.syncAsset(
const promises = missedParanetAssets.map((missedParanetAsset) => {
const {
blockchain: knowledgeAssetBlockchain,
contract: knowledgeAssetStorageContract,
tokenId: knowledgeAssetTokenId,
} = this.ualService.resolveUAL(missedParanetAsset.ual);

return this.syncAsset(
missedParanetAsset.ual,
missedParanetAsset.knowledgeAssetId,
blockchain,
contract,
tokenId,
knowledgeAssetBlockchain,
knowledgeAssetStorageContract,
knowledgeAssetTokenId,
paranetUAL,
paranetId,
paranetMetadata,
paranetNodesAccessPolicy,
operationId,
true,
),
);
);
});

const results = await Promise.all(promises);

Expand All @@ -401,6 +411,8 @@ class ParanetSyncCommand extends Command {
cachedKaCount + successfulCount,
);
}

return [successfulCount, results.length - successfulCount];
}

async syncNewKAs(
Expand All @@ -427,21 +439,63 @@ class ParanetSyncCommand extends Command {
PARANET_SYNC_KA_COUNT,
);
if (!nextKaArray.length) break;
kasToSync.push(...nextKaArray);

const filteredKAs = [];
for (const knowledgeAssetId of nextKaArray) {
const {
blockchain: knowledgeAssetBlockchain,
knowledgeAssetStorageContract,
tokenId: knowledgeAssetTokenId,
} = await this.blockchainModuleManager.getParanetKnowledgeAssetLocator(
blockchain,
knowledgeAssetId,
);

const statePresentInParanetRepository =
await this.tripleStoreService.paranetAssetExists(
blockchain,
knowledgeAssetStorageContract,
knowledgeAssetTokenId,
contract,
tokenId,
);

if (!statePresentInParanetRepository) {
const ual = this.ualService.deriveUAL(
knowledgeAssetBlockchain,
knowledgeAssetStorageContract,
knowledgeAssetTokenId,
);
filteredKAs.push([
ual,
knowledgeAssetBlockchain,
knowledgeAssetStorageContract,
knowledgeAssetTokenId,
]);
}
}

kasToSync.push(...filteredKAs);
}

const promises = kasToSync.map((knowledgeAssetId) =>
this.syncAsset(
knowledgeAssetId,
blockchain,
contract,
tokenId,
paranetUAL,
paranetId,
paranetMetadata,
paranetNodesAccessPolicy,
operationId,
),
const promises = kasToSync.map(
([
ual,
knowledgeAssetBlockchain,
knowledgeAssetStorageContract,
knowledgeAssetTokenId,
]) =>
this.syncAsset(
ual,
knowledgeAssetBlockchain,
knowledgeAssetStorageContract,
knowledgeAssetTokenId,
paranetUAL,
paranetId,
paranetMetadata,
paranetNodesAccessPolicy,
operationId,
),
);

const results = await Promise.all(promises);
Expand All @@ -454,6 +508,8 @@ class ParanetSyncCommand extends Command {
cachedKaCount + successfulCount,
);
}

return [successfulCount, results.length - successfulCount];
}

/**
Expand Down
Loading

0 comments on commit d0a4272

Please sign in to comment.