Skip to content

Commit

Permalink
Merge pull request #723 from desci-labs/sync-dispatches
Browse files Browse the repository at this point in the history
offload repo and automerge apis to cloudflare workers
  • Loading branch information
shadrach-tayo authored Dec 17, 2024
2 parents 1afbe39 + e5cf66f commit e756430
Show file tree
Hide file tree
Showing 17 changed files with 1,180 additions and 60 deletions.
12 changes: 8 additions & 4 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ RUN=1
# Enable google api functionalities
GOOGLE_CLIENT_ID=
GOOGLE_CLIENT_SECRET= # Unnecessary for now, not doing serverside 2step
GOOGLE_DEV_API_KEY= # Unnecessary for now, not doing serverside 2step
GOOGLE_DEV_API_KEY= # Unnecessary for now, not doing serverside 2step

## Configure RPC nodes (open an issue/ping us to access DeSci Labs' nodes)
ETHEREUM_RPC_URL=http://host.docker.internal:8545
Expand All @@ -134,7 +134,7 @@ MAX_LOCK_TIME=3600 # 1 hour
DOI_PREFIX=10.62891
CROSSREF_DOI_URL=https://doi.org

# Cross ref api
# Cross ref api
CROSSREF_METADATA_API=https://test.crossref.org/servlet/deposit
CROSSREF_ADMIN_API=https://test.crossref.org
CROSSREF_EMAIL=
Expand Down Expand Up @@ -162,5 +162,9 @@ ES_DB_NAME=
ES_DB_USER=
ES_DB_PASSWORD=

### open Alex Database - Postgres
OPEN_ALEX_DATABASE_URL=postgresql://username:password@host/database?schema=openalex
### open Alex Database - Postgres
OPEN_ALEX_DATABASE_URL=postgresql://username:password@host/database?schema=openalex

CLOUDFLARE_WORKER_API=http://host.docker.internal:5445
CLOUDFLARE_WORKER_API_SECRET=auth-token
ENABLE_WORKERS_API=true
10 changes: 7 additions & 3 deletions .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ REDIS_PORT=6379
REDIS_URL=redis://host.docker.internal:6379

# LOCAL DEV
# http://localhost:1984
# http://localhost:1984
#
# LIVE:
# host: 'arweave.net',
Expand Down Expand Up @@ -77,7 +77,7 @@ REPO_SERVER_URL=http://host.docker.internal:5485
DOI_PREFIX=10.62891
CROSSREF_DOI_URL=https://doi.org

# Cross ref api
# Cross ref api
CROSSREF_METADATA_API=https://test.crossref.org/servlet/deposit
CROSSREF_ADMIN_API=https://test.crossref.org
CROSSREF_API_KEY=
Expand All @@ -90,4 +90,8 @@ CROSSREF_NOTIFY_ENDPOINT=endpoint

# Automated metadata
AUTOMATED_METADATA_API=
AUTOMATED_METADATA_API_KEY=
AUTOMATED_METADATA_API_KEY=

CLOUDFLARE_WORKER_API=http://host.docker.internal:5446
CLOUDFLARE_WORKER_API_SECRET=test-api-secret
ENABLE_WORKERS_API=true
2 changes: 1 addition & 1 deletion .github/workflows/deploy-sync-server.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: Deploy Worker
on:
pull_request:
push:
paths:
- .github/workflows/deploy-sync-server.yaml
- sync-server/**
Expand Down
44 changes: 36 additions & 8 deletions desci-repo/src/controllers/nodes/documents.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Doc } from '@automerge/automerge';
import { AutomergeUrl, DocumentId } from '@automerge/automerge-repo';
import { ManifestActions } from '@desci-labs/desci-models';
import { ManifestActions, ResearchObjectV1 } from '@desci-labs/desci-models';
import { Request, Response } from 'express';
import { ZodError } from 'zod';

Expand All @@ -13,7 +13,7 @@ import { ResearchObjectDocument } from '../../types.js';
import { actionsSchema } from '../../validators/schema.js';

import { ensureUuidEndsWithDot } from './utils.js';
import { IS_DEV, IS_TEST, PARTY_SERVER_HOST, PARTY_SERVER_TOKEN } from '../../config.js';
import { ENABLE_PARTYKIT_FEATURE, IS_DEV, IS_TEST, PARTY_SERVER_HOST, PARTY_SERVER_TOKEN } from '../../config.js';

const protocol = IS_TEST || IS_DEV ? 'http://' : 'https://';

Expand Down Expand Up @@ -68,10 +68,24 @@ export const getLatestNodeManifest = async function (req: Request, res: Response
// todo: add support for documentId params and skip querying node
// fast track call if documentId is available
if (documentId) {
const document = await getDocument(documentId as DocumentId);
if (document) {
res.status(200).send({ ok: true, document });
if (ENABLE_PARTYKIT_FEATURE) {
const response = await fetch(`${protocol}${PARTY_SERVER_HOST}/api/documents?documentId=${documentId}`, {
// body: JSON.stringify({ uuid, documentId }),
headers: {
'x-api-key': PARTY_SERVER_TOKEN!,
},
});
const data = (await response.json()) as { document: ResearchObjectV1 };

logger.trace({ document: !!data.document, ENABLE_PARTYKIT_FEATURE }, 'Document Retrieved');
res.status(200).send({ ok: true, document: data.document });
return;
} else {
const document = await getDocument(documentId as DocumentId);
if (document) {
res.status(200).send({ ok: true, document });
return;
}
}
}

Expand All @@ -97,10 +111,24 @@ export const getLatestNodeManifest = async function (req: Request, res: Response
return;
}

const document = await getDocument(node.manifestDocumentId as DocumentId);
if (ENABLE_PARTYKIT_FEATURE) {
const response = await fetch(`${protocol}${PARTY_SERVER_HOST}/api/documents?documentId=${documentId}`, {
headers: {
'x-api-key': PARTY_SERVER_TOKEN!,
},
});
const data = (await response.json()) as { document: ResearchObjectV1 };

logger.trace({ document: !!document }, 'return DOCUMENT');
res.status(200).send({ ok: true, document });
logger.trace({ document: !!data.document, ENABLE_PARTYKIT_FEATURE }, 'Document Retrieved');
res.status(200).send({ ok: true, document: data.document });
return;
} else {
const document = await getDocument(node.manifestDocumentId as DocumentId);

logger.trace({ document: !!document, ENABLE_PARTYKIT_FEATURE }, 'return DOCUMENT');
res.status(200).send({ ok: true, document });
return;
}
} catch (err) {
logger.error({ err }, 'Error');
res.status(500).send({ ok: false, message: JSON.stringify(err) });
Expand Down
3 changes: 3 additions & 0 deletions desci-server/kubernetes/deployment_dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ spec:
export ELASTIC_SEARCH_USER="{{ .Data.ELASTIC_SEARCH_USER }}"
export ELASTIC_SEARCH_PW="{{ .Data.ELASTIC_SEARCH_PW }}"
export OPEN_ALEX_DATABASE_URL="{{ .Data.OPEN_ALEX_DATABASE_URL }}"
CLOUDFLARE_WORKER_API=nodes-dev-sync.desci.com
CLOUDFLARE_WORKER_API_SECRET=auth-token
ENABLE_WORKERS_API=true
export DEBUG_TEST=0;
echo "appfinish";
{{- end -}}
Expand Down
3 changes: 3 additions & 0 deletions desci-server/kubernetes/deployment_prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ spec:
export ELASTIC_SEARCH_USER="{{ .Data.ELASTIC_SEARCH_USER }}"
export ELASTIC_SEARCH_PW="{{ .Data.ELASTIC_SEARCH_PW }}"
export OPEN_ALEX_DATABASE_URL="{{ .Data.OPEN_ALEX_DATABASE_URL }}"
CLOUDFLARE_WORKER_API=nodes-sync.desci.com
CLOUDFLARE_WORKER_API_SECRET=auth-token
ENABLE_WORKERS_API=true
export IGNORE_LINE=0;
export DEBUG_TEST=0;
echo "appfinish";
Expand Down
3 changes: 3 additions & 0 deletions desci-server/kubernetes/deployment_staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ spec:
export ELASTIC_SEARCH_USER="{{ .Data.ELASTIC_SEARCH_USER }}"
export ELASTIC_SEARCH_PW="{{ .Data.ELASTIC_SEARCH_PW }}"
export OPEN_ALEX_DATABASE_URL="{{ .Data.OPEN_ALEX_DATABASE_URL }}"
CLOUDFLARE_WORKER_API=nodes-sync.desci.com
CLOUDFLARE_WORKER_API_SECRET=auth-token
ENABLE_WORKERS_API=true
export DEBUG_TEST=0;
echo "appfinish";
{{- end -}}
Expand Down
44 changes: 26 additions & 18 deletions desci-server/src/services/repoService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import { NodeUuid } from './manifestRepo.js';

const logger = parentLogger.child({ module: 'Repo Service' });

const cloudflareWorkerApi = process.env.CLOUDFLARE_WORKER_API;
const cloudflareWorkerApiSecret = process.env.CLOUDFLARE_WORKER_API_SECRET;
const enableWorkersApi = process.env.ENABLE_WORKERS_API == 'true';

type ApiResponse<B> = { ok: boolean } & B;

class RepoService {
Expand Down Expand Up @@ -36,21 +40,24 @@ class RepoService {
}

async dispatchAction(arg: { uuid: NodeUuid | string; documentId: DocumentId; actions: ManifestActions[] }) {
logger.info({ arg }, 'Disatch Changes');
logger.info({ arg, enableWorkersApi, cloudflareWorkerApi }, 'Disatch Changes');
const response = await this.#client.post<{ ok: boolean; document: ResearchObjectDocument }>(
`${this.baseUrl}/v1/nodes/documents/dispatch`,
enableWorkersApi
? `${cloudflareWorkerApi}/api/documents/dispatch`
: `${this.baseUrl}/v1/nodes/documents/dispatch`,
arg,
{
headers: {
'x-api-remote-traceid': (als.getStore() as any)?.traceId,
...(enableWorkersApi && { 'x-api-key': cloudflareWorkerApiSecret }),
},
},
);
logger.info({ arg, ok: response.data.ok }, 'Disatch Changes Response');
logger.trace({ arg, ok: response.data.ok }, 'Disatch Actions Response');
if (response.status === 200 && response.data.ok) {
return response.data.document;
} else {
// logger.info({ response: response.data }, 'Disatch Changes Response');
logger.trace({ response: response.data }, 'Disatch Actions Failed');
return null;
}
}
Expand All @@ -59,11 +66,14 @@ class RepoService {
logger.info({ arg }, 'Disatch Actions');
try {
const response = await this.#client.post<{ ok: boolean; document: ResearchObjectDocument }>(
`${this.baseUrl}/v1/nodes/documents/actions`,
enableWorkersApi
? `${cloudflareWorkerApi}/api/documents/actions`
: `${this.baseUrl}/v1/nodes/documents/actions`,
arg,
{
headers: {
'x-api-remote-traceid': (als.getStore() as any)?.traceId,
...(enableWorkersApi && { 'x-api-key': cloudflareWorkerApiSecret }),
},
},
);
Expand All @@ -83,19 +93,20 @@ class RepoService {
try {
const response = await this.#client.post<
ApiResponse<{ documentId: DocumentId; document: ResearchObjectDocument }>
>(`${this.baseUrl}/v1/nodes/documents`, arg, {
>(enableWorkersApi ? `${cloudflareWorkerApi}/api/documents` : `${this.baseUrl}/v1/nodes/documents`, arg, {
headers: {
'x-api-remote-traceid': (als.getStore() as any)?.traceId,
...(enableWorkersApi && { 'x-api-key': cloudflareWorkerApiSecret }),
},
});
logger.info({ response: response.data }, 'Create Draft Response');
if (response.status === 200 && response.data.ok) {
logger.info({ status: response.status, response: response.data }, 'Create Draft Response');
if (response.status === 200) {
return response.data;
} else {
return null;
}
} catch (err) {
logger.error({ err }, 'Create Draft Error');
logger.error({ err, enableWorkersApi, cloudflareWorkerApi }, 'Create Draft Error');
return null;
}
}
Expand All @@ -106,27 +117,24 @@ class RepoService {
return null;
}
try {
// const controller = new AbortController();
// setTimeout(() => {
// logger.trace('Abort request');
// controller.abort();
// }, arg.timeout ?? this.defaultTimeoutInMilliseconds);
logger.trace(
{ timout: arg.timeout || this.defaultTimeoutInMilliseconds, uuid: arg.uuid, documentId: arg.documentId },
'[getDraftDocument]',
);
const response = await this.#client.get<ApiResponse<{ document: ResearchObjectDocument }>>(
`${this.baseUrl}/v1/nodes/documents/draft/${arg.uuid}?documentId=${arg.documentId}`,
enableWorkersApi
? `${cloudflareWorkerApi}/api/documents?documentId=${arg.documentId}`
: `${this.baseUrl}/v1/nodes/documents/draft/${arg.uuid}?documentId=${arg.documentId}`,
{
headers: {
'x-api-remote-traceid': (als.getStore() as any)?.traceId,
...(enableWorkersApi && { 'x-api-key': cloudflareWorkerApiSecret }),
},
// timeout: arg.timeout ?? this.defaultTimeoutInMilliseconds,
signal: AbortSignal.timeout(arg.timeout ?? this.defaultTimeoutInMilliseconds), // controller.signal,
signal: AbortSignal.timeout(arg.timeout ?? this.defaultTimeoutInMilliseconds),
timeoutErrorMessage: this.timeoutErrorMessage,
},
);
logger.info({ arg }, 'Retrieve Draft Document');
logger.info({ arg, doc: response.data }, 'Retrieve Draft Document');
if (response.status === 200 && response.data.ok) {
return response.data.document;
} else {
Expand Down
2 changes: 1 addition & 1 deletion desci-server/test/integration/automerge.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const createDraftNode = async (user: User, baseManifest: ResearchObjectV1, baseM
return { node: updatedNode || node, documentId: response?.documentId };
};

describe('Automerge Integration', () => {
describe.only('Automerge Integration', () => {
let user: User;
let unauthedUser: User;
// let node: Node;
Expand Down
2 changes: 1 addition & 1 deletion desci-server/test/integration/data.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const createDraftNode = async (user: User, baseManifest: ResearchObjectV1, baseM
return { node: updatedNode || node, documentId: response?.documentId };
};

describe('Data Controllers', () => {
describe.only('Data Controllers', () => {
let user: User;
let unauthedUser: User;
// let node: Node;
Expand Down
6 changes: 4 additions & 2 deletions sync-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,15 @@
"@automerge/automerge-repo-network-websocket": "^1.2.1",
"@automerge/automerge-repo-react-hooks": "^1.0.19",
"@cloudflare/workers-types": "^4.20241022.0",
"@desci-labs/desci-models": "^0.2.18",
"@desci-labs/desci-models": "^0.2.19",
"deep-equal": "^2.2.3",
"isomorphic-ws": "^5.0.0",
"partykit": "^0.0.111",
"partyserver": "^0.0.57",
"partysocket": "^1.0.2",
"pg": "^8.13.1",
"pino-std-serializers": "^7.0.0",
"ws": "^8.14.2"
"ws": "^8.14.2",
"zod": "^3.24.1"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export class PostgresStorageAdapter implements StorageAdapterInterface {
async save(keyArray: StorageKey, binary: Uint8Array): Promise<void> {
const key = getKey(keyArray);
this.cache[key] = binary;
console.log('[save]', { key });

try {
await this.query(
`INSERT INTO "${this.tableName}" (key, value) VALUES ($1, $2) ON CONFLICT(key) DO UPDATE SET value = $2 RETURNING key`,
Expand Down Expand Up @@ -87,10 +87,7 @@ export class PostgresStorageAdapter implements StorageAdapterInterface {
}

private async loadRangeKeys(keyPrefix: string[]): Promise<string[]> {
console.log('LoadRange Keys', { keyPrefix });
const response = await this.query(`SELECT key FROM "${this.tableName}" WHERE key LIKE $1`, [`${keyPrefix}%`]);
// console.log({ keyPrefix, response: response?.length }, '[LOADED RANGE Keys]');

return response ? response.map((row) => row.key) : [];
}
}
Expand Down
Loading

0 comments on commit e756430

Please sign in to comment.