Skip to content

Commit

Permalink
Merge pull request #732 from desci-labs/cloudflare-worker-migration
Browse files Browse the repository at this point in the history
Cloudflare worker migration
  • Loading branch information
shadrach-tayo authored Dec 21, 2024
2 parents 9956f50 + 2dbc024 commit caff95d
Show file tree
Hide file tree
Showing 22 changed files with 246 additions and 516 deletions.
2 changes: 1 addition & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,4 @@ OPEN_ALEX_DATABASE_URL=postgresql://username:password@host/database?schema=opena

CLOUDFLARE_WORKER_API=http://host.docker.internal:5445
CLOUDFLARE_WORKER_API_SECRET=auth-token
ENABLE_WORKERS_API=false
ENABLE_WORKERS_API=true
2 changes: 1 addition & 1 deletion .env.test
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,4 @@ AUTOMATED_METADATA_API_KEY=

CLOUDFLARE_WORKER_API=http://host.docker.internal:5446
CLOUDFLARE_WORKER_API_SECRET=auth-token
ENABLE_WORKERS_API=false
ENABLE_WORKERS_API=true
14 changes: 7 additions & 7 deletions .github/workflows/build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,13 @@ jobs:
echo -e "\nSENTRY_AUTH_TOKEN=$SENTRY_AUTH" >> desci-server/.env
cd desci-server && yarn build
- name: Set up sync server
run: |
cd sync-server && yarn --ignore-engines && ./scripts/build.sh test
if [ $? -ne 0 ]; then
exit 1
fi
echo "DISK USE:"; find / -maxdepth 1 -mindepth 1 -type d -exec du -hs {} \; 2>/dev/null
# - name: Set up sync server
# run: |
# cd sync-server && yarn --ignore-engines && ./scripts/build.sh test
# if [ $? -ne 0 ]; then
# exit 1
# fi
# echo "DISK USE:"; find / -maxdepth 1 -mindepth 1 -type d -exec du -hs {} \; 2>/dev/null

- name: Run tests
run: |
Expand Down
6 changes: 5 additions & 1 deletion .github/workflows/deploy-sync-server.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ on:
branches: # array of glob patterns matching against refs/heads. Optional; defaults to all
- main # triggers on pushes that contain changes
- develop

env:
API_TOKEN: ${{ secrets.API_TOKEN }}
jobs:
deploy:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -37,6 +38,8 @@ jobs:
environment: staging
workingDirectory: sync-server
wranglerVersion: 3.95.0
secrets: API_TOKEN

- name: Build & Deploy Worker (Production)
if: github.ref == 'refs/heads/main'
uses: cloudflare/wrangler-action@v3
Expand All @@ -46,3 +49,4 @@ jobs:
environment: production
workingDirectory: sync-server
wranglerVersion: 3.95.0
secrets: API_TOKEN
2 changes: 1 addition & 1 deletion desci-repo/.env.test
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ IPFS_RESOLVER_OVERRIDE=http://host.docker.internal:8089/ipfs

PARTY_SERVER_HOST=host.docker.internal:5446
PARTY_SERVER_TOKEN=auth-token
ENABLE_PARTYKIT_FEATURE=false
ENABLE_PARTYKIT_FEATURE=true
CLOUDFLARE_WORKER_API_SECRET=auth-token
6 changes: 3 additions & 3 deletions desci-repo/kubernetes/deployment_dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ spec:
export DESCI_SERVER_URL={{ .Data.DESCI_SERVER_URL }}
export PARTY_SERVER_HOST=nodes-dev-sync.desci.com
export PARTY_SERVER_TOKEN=auth-token
export ENABLE_PARTYKIT_FEATURE=false
export ENABLE_PARTYKIT_FEATURE=true
export PINO_LOG_LEVEL=info
{{- end -}}
labels:
Expand All @@ -66,8 +66,8 @@ spec:
ports:
- containerPort: 5484
name: server-api
- containerPort: 5445
name: ws-api
# - containerPort: 5445
# name: ws-api
resources:
limits:
cpu: '4.0'
Expand Down
6 changes: 3 additions & 3 deletions desci-repo/kubernetes/deployment_prod.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ spec:
export DESCI_SERVER_URL={{ .Data.DESCI_SERVER_URL }}
export PARTY_SERVER_HOST=nodes-sync.desci.com
export PARTY_SERVER_TOKEN=auth-token
export ENABLE_PARTYKIT_FEATURE=false
export ENABLE_PARTYKIT_FEATURE=true
export PINO_LOG_LEVEL=info
{{- end -}}
labels:
Expand All @@ -67,8 +67,8 @@ spec:
ports:
- containerPort: 5484
name: server-api
- containerPort: 5445
name: ws-api
# - containerPort: 5445
# name: ws-api
resources:
limits:
cpu: '4'
Expand Down
4 changes: 2 additions & 2 deletions desci-repo/kubernetes/deployment_staging.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ spec:
ports:
- containerPort: 5484
name: server-api
- containerPort: 5445
name: ws-api
# - containerPort: 5445
# name: ws-api
resources:
limits:
cpu: '2'
Expand Down
221 changes: 107 additions & 114 deletions desci-repo/src/controllers/nodes/documents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import os from 'os';
import { Doc } from '@automerge/automerge';
import { AutomergeUrl, DocHandleEphemeralMessagePayload, DocumentId, PeerId, Repo } from '@automerge/automerge-repo';
import { ManifestActions, ResearchObjectV1 } from '@desci-labs/desci-models';
import { Request, Response } from 'express';
import { json, Request, Response } from 'express';
import WebSocket from 'isomorphic-ws';
import { ZodError } from 'zod';

Expand Down Expand Up @@ -48,25 +48,44 @@ export const createNodeDocument = async function (req: Request, res: Response) {
return;
}

let uuid = req.body.uuid;
const manifest = req.body.manifest;
uuid = ensureUuidEndsWithDot(uuid);
logger.info({ peerId: backendRepo.networkSubsystem.peerId, uuid }, '[Backend REPO]:');
const handle = backendRepo.create<ResearchObjectDocument>();
handle.change(
(d) => {
d.manifest = manifest;
d.uuid = uuid;
d.driveClock = Date.now().toString();
},
{ message: 'Init Document', time: Date.now() },
);

logger.trace({ peerId: backendRepo.networkSubsystem.peerId, uuid }, 'Document Created');

const document = await handle.doc();

res.status(200).send({ ok: true, document, documentId: handle.documentId });
const { uuid, manifest } = req.body;
logger.trace({ protocol, PARTY_SERVER_HOST, ENABLE_PARTYKIT_FEATURE }, 'ENV');
if (ENABLE_PARTYKIT_FEATURE) {
const response = await fetch(`${protocol}${PARTY_SERVER_HOST}/api/documents`, {
method: 'POST',
body: JSON.stringify({ uuid, manifest }),
headers: {
'x-api-key': process.env.CLOUDFLARE_WORKER_API_SECRET ?? 'auth-token',
},
});
if (response.status === 200) {
const data = await response.json();

logger.trace({ uuid }, 'Document Created');
res.status(200).send({ ok: true, ...data });
} else {
res.status(response.status).send({ ok: false });
}
} else {
let uuid = req.body.uuid;
const manifest = req.body.manifest;
uuid = ensureUuidEndsWithDot(uuid);
logger.info({ peerId: backendRepo.networkSubsystem.peerId, uuid }, '[Backend REPO]:');
const handle = backendRepo.create<ResearchObjectDocument>();
handle.change(
(d) => {
d.manifest = manifest;
d.uuid = uuid;
d.driveClock = Date.now().toString();
},
{ message: 'Init Document', time: Date.now() },
);

logger.trace({ peerId: backendRepo.networkSubsystem.peerId, uuid }, 'Document Created');

const document = await handle.doc();
res.status(200).send({ ok: true, document, documentId: handle.documentId });
}
} catch (err) {
logger.error({ err }, '[Error]::createNodeDocument');
res.status(500).send({ ok: false, message: JSON.stringify(err) });
Expand All @@ -83,25 +102,24 @@ export const getLatestNodeManifest = async function (req: Request, res: Response
// fast track call if documentId is available
console.log('[getLatestNodeManifest]', { documentId, ENABLE_PARTYKIT_FEATURE });
if (documentId) {
// if (ENABLE_PARTYKIT_FEATURE) {
// const response = await fetch(`${protocol}${PARTY_SERVER_HOST}/api/documents?documentId=${documentId}`, {
// // body: JSON.stringify({ uuid, documentId }),
// headers: {
// 'x-api-key': PARTY_SERVER_TOKEN!,
// },
// });
// const data = (await response.json()) as { document: ResearchObjectV1 };

// logger.trace({ document: !!data.document, ENABLE_PARTYKIT_FEATURE }, 'Document Retrieved');
// res.status(200).send({ ok: true, document: data.document });
// return;
// } else {
const document = await getDocument(documentId as DocumentId);
console.log('[getLatestNodeManifest::document]', { document });
if (document) {
res.status(200).send({ ok: true, document });
if (ENABLE_PARTYKIT_FEATURE) {
const response = await fetch(`${protocol}${PARTY_SERVER_HOST}/parties/automerge/${documentId}`, {
headers: {
'x-api-key': PARTY_SERVER_TOKEN!,
},
});
const data = (await response.json()) as { document: ResearchObjectV1 };

logger.trace({ document: !!data.document, ENABLE_PARTYKIT_FEATURE }, 'Document Retrieved');
res.status(200).send({ ok: true, document: data.document });
return;
// }
} else {
const document = await getDocument(documentId as DocumentId);
console.log('[getLatestNodeManifest::document]', { document });
if (document) {
res.status(200).send({ ok: true, document });
return;
}
}
}

Expand All @@ -127,24 +145,24 @@ export const getLatestNodeManifest = async function (req: Request, res: Response
return;
}

// if (ENABLE_PARTYKIT_FEATURE) {
// const response = await fetch(`${protocol}${PARTY_SERVER_HOST}/api/documents?documentId=${documentId}`, {
// headers: {
// 'x-api-key': PARTY_SERVER_TOKEN!,
// },
// });
// const data = (await response.json()) as { document: ResearchObjectV1 };

// logger.trace({ document: !!data.document, ENABLE_PARTYKIT_FEATURE }, 'Document Retrieved');
// res.status(200).send({ ok: true, document: data.document });
// return;
// } else {
const document = await getDocument(node.manifestDocumentId as DocumentId);

logger.trace({ document: !!document, ENABLE_PARTYKIT_FEATURE }, 'return DOCUMENT');
res.status(200).send({ ok: true, document });
return;
// }
if (ENABLE_PARTYKIT_FEATURE) {
const response = await fetch(`${protocol}${PARTY_SERVER_HOST}/parties/automerge/${documentId}`, {
headers: {
'x-api-key': PARTY_SERVER_TOKEN!,
},
});
const data = (await response.json()) as { document: ResearchObjectV1 };

logger.trace({ document: !!data.document, ENABLE_PARTYKIT_FEATURE }, 'Document Retrieved');
res.status(200).send({ ok: true, document: data.document });
return;
} else {
const document = await getDocument(node.manifestDocumentId as DocumentId);

logger.trace({ document: !!document, ENABLE_PARTYKIT_FEATURE }, 'return DOCUMENT');
res.status(200).send({ ok: true, document });
return;
}
} catch (err) {
logger.error({ err }, 'Error');
res.status(500).send({ ok: false, message: JSON.stringify(err) });
Expand All @@ -166,39 +184,26 @@ export const dispatchDocumentChange = async function (req: RequestWithNode, res:
res.status(400).send({ ok: false, message: 'No actions to dispatch' });
return;
}

// const repo = new Repo({
// peerId: `repo-server-${hostname}` as PeerId,
// // Since this is a server, we don't share generously — meaning we only sync documents they already
// // know about and can ask for by ID.
// sharePolicy: async () => true,
// });
// const adapter = new PartykitNodeWsAdapter({
// host: PARTY_SERVER_HOST!,
// party: 'automerge',
// room: documentId,
// query: { auth: PARTY_SERVER_TOKEN, documentId },
// protocol: IS_DEV || IS_TEST ? 'ws' : 'wss',
// WebSocket: WebSocket,
// });
// repo.networkSubsystem.addNetworkAdapter(adapter);
// await repo.networkSubsystem.whenReady();

// const handle = repo.find<ResearchObjectDocument>(getAutomergeUrl(documentId));
// handle.broadcast([documentId, { type: 'dispatch-changes', actions }]);

// // await new Promise((resolve) => setTimeout(resolve, 2000));
// // console.log('[TIMEOUT]', { documentId, actions });
// logger.trace({ documentId, actions }, 'Actions');
logger.trace({ documentId, actions }, 'Actions');

let document: Doc<ResearchObjectDocument> | undefined;

const dispatchChange = await getDocumentUpdater(documentId);

// await new Promise((resolve) => setTimeout(resolve, 5000));

for (const action of actions) {
document = await dispatchChange(action);
if (ENABLE_PARTYKIT_FEATURE) {
const response = await fetch(`${protocol}${PARTY_SERVER_HOST}/parties/automerge/${documentId}`, {
method: 'POST',
body: JSON.stringify({ actions, documentId }),
headers: {
'x-api-key': PARTY_SERVER_TOKEN!,
},
});
const data = (await response.json()) as { document: ResearchObjectDocument };
document = data.document;
} else {
const dispatchChange = await getDocumentUpdater(documentId);

for (const action of actions) {
document = await dispatchChange(action);
}
}

if (!document) {
Expand Down Expand Up @@ -234,36 +239,24 @@ export const dispatchDocumentActions = async function (req: RequestWithNode, res
const validatedActions = await actionsSchema.parseAsync(actions);
logger.trace({ validatedActions }, 'Actions validated');

// const handle = await getDocumentHandle(documentId);
// const repo = new Repo({
// peerId: `repo-server-${hostname}` as PeerId,
// // Since this is a server, we don't share generously — meaning we only sync documents they already
// // know about and can ask for by ID.
// sharePolicy: async () => true,
// });
// const adapter = new PartykitNodeWsAdapter({
// host: PARTY_SERVER_HOST!,
// party: 'automerge',
// room: documentId,
// query: { auth: PARTY_SERVER_TOKEN, documentId },
// protocol: IS_DEV || IS_TEST ? 'ws' : 'wss',
// WebSocket: WebSocket,
// });
// repo.networkSubsystem.addNetworkAdapter(adapter);
// await repo.networkSubsystem.whenReady();

// const handle = repo.find<ResearchObjectDocument>(getAutomergeUrl(documentId));
// handle.broadcast([documentId, { type: 'dispatch-action', actions }]);

// logger.trace({ documentId, validatedActions }, 'Actions');

let document: Doc<ResearchObjectDocument> | undefined;

const dispatchChange = await getDocumentUpdater(documentId);
// await new Promise((resolve) => setTimeout(resolve, 300));

for (const action of actions) {
document = await dispatchChange(action);
if (ENABLE_PARTYKIT_FEATURE) {
const response = await fetch(`${protocol}${PARTY_SERVER_HOST}/parties/automerge/${documentId}`, {
method: 'POST',
body: JSON.stringify({ actions, documentId }),
headers: {
'x-api-key': PARTY_SERVER_TOKEN!,
},
});
const data = (await response.json()) as { document: ResearchObjectDocument };
document = data.document;
} else {
const dispatchChange = await getDocumentUpdater(documentId);

for (const action of actions) {
document = await dispatchChange(action);
}
}

logger.trace({ actions, document }, '[Post Action]');
Expand Down
2 changes: 1 addition & 1 deletion desci-server/kubernetes/deployment_dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ spec:
export OPEN_ALEX_DATABASE_URL="{{ .Data.OPEN_ALEX_DATABASE_URL }}"
CLOUDFLARE_WORKER_API=https://nodes-dev-sync.desci.com
CLOUDFLARE_WORKER_API_SECRET=auth-token
ENABLE_WORKERS_API=false
ENABLE_WORKERS_API=true
export DEBUG_TEST=0;
echo "appfinish";
{{- end -}}
Expand Down
Loading

0 comments on commit caff95d

Please sign in to comment.