Skip to content

Commit

Permalink
Merge pull request #576 from desci-labs/dataref-dr-ceramic
Browse files Browse the repository at this point in the history
DataRef Dr Ceramic Update
  • Loading branch information
hubsmoke authored Oct 22, 2024
2 parents a9afdd4 + 28fb07f commit 1f156bb
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 41 deletions.
2 changes: 1 addition & 1 deletion desci-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"script:upgrade-manifests": "debug=* node --no-warnings --enable-source-maps --loader ts-node/esm ./src/scripts/upgradeManifests.ts",
"script:test-upgrade-manifests": "debug=* node --no-warnings --enable-source-maps --loader ts-node/esm ./src/scripts/testUpgradeManifests.ts",
"script:fill-research-fields": "debug=* node --no-warnings --enable-source-maps --loader ts-node/esm ./src/scripts/fill-research-fields.ts",
"script:fix-data-refs": "debug=* node --no-warnings --enable-source-maps --loader ts-node/esm ./src/scripts/dataRefDoctor.ts",
"script:fix-data-refs": "debug=* node --inspect=0.0.0.0:9277 --no-warnings --enable-source-maps --loader ts-node/esm ./src/scripts/dataRefDoctor.ts",
"script:active-users": "debug=* node --no-warnings --enable-source-maps --loader ts-node/esm ./src/scripts/activeUsers.ts",
"script:invalidate-redis-cache": "debug=* node --no-warnings --enable-source-maps --loader ts-node/esm ./src/scripts/invalidate-redis-cache.ts",
"script:increase-base-drive-storage": "debug=* node --no-warnings --enable-source-maps --loader ts-node/esm ./src/scripts/increase-base-drive-storage.ts",
Expand Down
103 changes: 70 additions & 33 deletions desci-server/src/scripts/dataRefDoctor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import { getSizeForCid } from '../services/ipfs.js';
import { getIndexedResearchObjects } from '../theGraph.js';
import { validateAndHealDataRefs, validateDataReferences } from '../utils/dataRefTools.js';
import { cleanupManifestUrl } from '../utils/manifest.js';
import { hexToCid } from '../utils.js';
import { ensureUuidEndsWithDot, hexToCid } from '../utils.js';

/*
Usage Guidelines:
Expand All @@ -18,6 +18,7 @@ Usage Guidelines:
- START and END are optional flags, if set, it will only process nodes within the range.
- MARK_EXTERNALS is an optional flag, if true, it will mark external refs as external, downside is that it can take significantly longer to process, also size diff checking disabled when marking externals.
- TX_HASH is an optional param, used for fixing node version of a specific published node version. (Edgecase of multiple publishes with same manifestCid)
- COMMIT_ID is an optional param, used for fixing node version of a specific published node version.
- USER_EMAIL is only required for the fillPublic operation
- WORKING_TREE_URL is only required for the fillPublic operation, useful if a node is known to contain external cids, it can cut down the backfill time significantly for dags with external cids.
Expand All @@ -38,7 +39,7 @@ healAll: OPERATION=healAll PUBLIC_REFS=true MARK_EXTERNALS=true npm run scr
const logger = parentLogger.child({ module: 'SCRIPTS::dataRefDoctor' });

main();
function main() {
async function main() {
const {
operation,
nodeUuid,
Expand All @@ -48,40 +49,42 @@ function main() {
end,
markExternals,
txHash,
commitId,
userEmail,
workingTreeUrl,
newNodeUuid,
} = getOperationEnvs();
const startIterator = isNaN(start as any) ? undefined : parseInt(start);
const endIterator = isNaN(end as any) ? undefined : parseInt(end);

switch (operation) {
case 'validate':
if (!nodeUuid && !manifestCid) return logger.error('Missing NODE_UUID or MANIFEST_CID');
validateDataReferences({ nodeUuid, manifestCid, publicRefs, markExternals, txHash });
await validateDataReferences({ nodeUuid, manifestCid, publicRefs, markExternals, txHash, commitId });
break;
case 'heal':
if (!nodeUuid && !manifestCid) return logger.error('Missing NODE_UUID or MANIFEST_CID');
validateAndHealDataRefs({ nodeUuid, manifestCid, publicRefs, markExternals, txHash });
await validateAndHealDataRefs({ nodeUuid, manifestCid, publicRefs, markExternals, txHash, commitId });
break;
case 'validateAll':
dataRefDoctor(false, publicRefs, startIterator, endIterator, markExternals);
await dataRefDoctor({ heal: false, publicRefs, start: startIterator, end: endIterator, markExternals });
break;
case 'healAll':
dataRefDoctor(true, publicRefs, startIterator, endIterator, markExternals);
await dataRefDoctor({ heal: true, publicRefs, start: startIterator, end: endIterator, markExternals });
break;
case 'fillPublic':
if (!nodeUuid && !userEmail) return logger.error('Missing NODE_UUID or USER_EMAIL');
fillPublic(nodeUuid, userEmail, workingTreeUrl);
await fillPublic(nodeUuid, userEmail, workingTreeUrl);
break;
case 'clonePrivateNode':
if (!nodeUuid && !newNodeUuid) return logger.error('Missing NODE_UUID or NEW_NODE_UUID');
clonePrivateNode(nodeUuid, newNodeUuid);
await clonePrivateNode(nodeUuid, newNodeUuid);
break;
default:
logger.error('Invalid operation, valid operations include: validate, heal, validateAll, healAll');
return;
}
logger.info('DataRefDr has finished running');
process.exit(0);
}

function getOperationEnvs() {
Expand All @@ -95,19 +98,22 @@ function getOperationEnvs() {
end: process.env.END,
markExternals: process.env.MARK_EXTERNALS?.toLowerCase() === 'true' ? true : false,
txHash: process.env.TX_HASH || null,
commitId: process.env.COMMIT_ID || null,
workingTreeUrl: process.env.WORKING_TREE_URL || null,
userEmail: process.env.USER_EMAIL || null,
};
}

type DataRefDoctorArgs = {
heal: boolean;
publicRefs: boolean;
start?: number;
end?: number;
markExternals?: boolean;
};

//todo: add public handling
async function dataRefDoctor(
heal: boolean,
publicRefs: boolean,
start?: number,
end?: number,
markExternals?: boolean,
) {
async function dataRefDoctor({ heal, publicRefs, start, end, markExternals }: DataRefDoctorArgs) {
const nodes = await prisma.node.findMany({
orderBy: {
id: 'asc',
Expand All @@ -133,11 +139,14 @@ async function dataRefDoctor(
`[DataRefDoctor]Processing node: ${nodes[i].id}, found versions indexed: ${totalVersionsIndexed}, for nodeUuid: ${node.uuid}`,
);
for (let nodeVersIdx = 0; nodeVersIdx < totalVersionsIndexed; nodeVersIdx++) {
logger.info(
`[DataRefDoctor]Processing indexed version: ${nodeVersIdx}, with txHash: ${indexedNode.versions[nodeVersIdx]?.id}`,
);
const hexCid = indexedNode.versions[nodeVersIdx]?.cid || indexedNode.recentCid;
const txHash = indexedNode.versions[nodeVersIdx]?.id;
const commitId = indexedNode.versions[nodeVersIdx]?.commitId;
const publishIdentifier = commitId || txHash;

logger.info(
`[DataRefDoctor]Processing indexed version: ${nodeVersIdx}, with publishIdentifier: ${publishIdentifier}`,
);
const manifestCid = hexToCid(hexCid);
if (heal) {
await validateAndHealDataRefs({
Expand All @@ -146,9 +155,19 @@ async function dataRefDoctor(
publicRefs: true,
markExternals,
txHash,
commitId,
includeManifestRef: true,
});
} else {
validateDataReferences({ nodeUuid: node.uuid, manifestCid, publicRefs: true, markExternals, txHash });
validateDataReferences({
nodeUuid: node.uuid,
manifestCid,
publicRefs: true,
markExternals,
txHash,
commitId,
includeManifestRef: true,
});
}
}
}
Expand All @@ -159,13 +178,15 @@ async function dataRefDoctor(
manifestCid: node.manifestUrl,
publicRefs: false,
markExternals,
includeManifestRef: true,
});
} else {
await validateDataReferences({
nodeUuid: node.uuid,
manifestCid: node.manifestUrl,
publicRefs: false,
markExternals,
includeManifestRef: true,
});
}
}
Expand All @@ -179,22 +200,29 @@ async function fillPublic(nodeUuid: string, userEmail: string, workingTreeUrl?:
const user = await prisma.user.findUnique({ where: { email: userEmail } });
if (!user) return logger.error(`[FillPublic] Failed to find user with email: ${userEmail}`);

if (!nodeUuid.endsWith('.')) nodeUuid += '.';
nodeUuid = ensureUuidEndsWithDot(nodeUuid);
const { researchObjects } = await getIndexedResearchObjects([nodeUuid]);
if (!researchObjects.length)
logger.error(`[FillPublic] Failed to resolve any public nodes with the uuid: ${nodeUuid}`);
if (!researchObjects.length) {
logger.error(
{ nodeUuid, researchObjects },
`[FillPublic] Failed to resolve any published nodes with the uuid: ${nodeUuid}, aborting script`,
);
return;
}

const indexedNode = researchObjects[0];
const latestHexCid = indexedNode.recentCid;
const latestManifestCid = hexToCid(latestHexCid);
const manifestUrl = cleanupManifestUrl(latestManifestCid);
const latestManifest = await (await axios.get(manifestUrl)).data;

if (!latestManifest)
return logger.error(
if (!latestManifest) {
logger.error(
{ manifestUrl, latestManifestCid },
`[FillPublic] Failed to retrieve manifest from ipfs cid: ${latestManifestCid}`,
`[FillPublic] Failed to retrieve manifest from ipfs cid: ${latestManifestCid}, aborting script`,
);
return;
}

const title = '[IMPORTED NODE]' + latestManifest.title || 'Imported Node';
let node = await prisma.node.findUnique({ where: { uuid: nodeUuid } });
Expand All @@ -214,18 +242,26 @@ async function fillPublic(nodeUuid: string, userEmail: string, workingTreeUrl?:
const totalVersionsIndexed = indexedNode.versions.length || 0;
try {
for (let nodeVersIdx = 0; nodeVersIdx < totalVersionsIndexed; nodeVersIdx++) {
logger.info(
`[DataRefDoctor]Processing indexed version: ${nodeVersIdx}, with txHash: ${indexedNode.versions[nodeVersIdx]?.id}`,
);
const hexCid = indexedNode.versions[nodeVersIdx]?.cid || indexedNode.recentCid;
const txHash = indexedNode.versions[nodeVersIdx]?.id;
const commitId = indexedNode.versions[nodeVersIdx]?.commitId;
const commitIdOrTxHash = commitId || txHash;

logger.info(
`[DataRefDoctor]Processing indexed version: ${nodeVersIdx}, with ${commitId ? 'commitId:' : 'txHash'}: ${commitIdOrTxHash}`,
);
const manifestCid = hexToCid(hexCid);

const nodeVersionPublishIdentifiers = {
...(txHash && { transactionId: txHash }),
...(commitId && { commitId }),
};

const nodeVersion = await prisma.nodeVersion.create({
data: {
nodeId: node.id,
manifestUrl: manifestCid,
transactionId: txHash,
...nodeVersionPublishIdentifiers,
},
});

Expand All @@ -242,7 +278,7 @@ async function fillPublic(nodeUuid: string, userEmail: string, workingTreeUrl?:
};
logger.info(
{ manifestEntry },
`[DataRefDoctor] Manifest entry being created for indexed version ${nodeVersIdx}, with txHash: ${indexedNode.versions[nodeVersIdx]?.id}`,
`[DataRefDoctor] Manifest entry being created for indexed version ${nodeVersIdx}, with publishIdentifier: ${commitIdOrTxHash}`,
);
await prisma.publicDataReference.create({ data: manifestEntry });

Expand All @@ -252,10 +288,11 @@ async function fillPublic(nodeUuid: string, userEmail: string, workingTreeUrl?:
manifestCid,
publicRefs: true,
txHash,
commitId,
workingTreeUrl,
});
logger.info(
`[DataRefDoctor]Successfully processed indexed node v: ${nodeVersIdx}, with txHash: ${indexedNode.versions[nodeVersIdx]?.id}, under user: ${user.email}`,
`[DataRefDoctor]Successfully processed indexed node v: ${nodeVersIdx}, with publishIdentifier: ${commitIdOrTxHash}, under user: ${user.email}`,
);
}
logger.info(`[FillPublic] Successfully backfilled data refs for public node: ${nodeUuid}`);
Expand All @@ -272,7 +309,7 @@ async function fillPublic(nodeUuid: string, userEmail: string, workingTreeUrl?:
totalVersionsIndexed,
indexedNode,
},
`[FillPublic] Failed to backfill data refs for public node: ${nodeUuid}, error`,
`[FillPublic] Failed to backfill data refs for public node: ${nodeUuid}`,
);
}
}
Expand Down
1 change: 0 additions & 1 deletion desci-server/src/services/user.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ export async function connectOrcidToUserIfPossible(
logger.info({ fn: 'orcidCheck', user }, `Requesting user ${user}`);
if (!user.orcid || user.orcid === orcid) {
let nodeConnect: Awaited<ReturnType<typeof setOrcidForUser>>;
debugger;
if (!user.orcid || !(await isAuthTokenSetForUser(user.id))) {
nodeConnect = await setOrcidForUser(user.id, orcid, {
accessToken,
Expand Down
Loading

0 comments on commit 1f156bb

Please sign in to comment.