Skip to content

Commit

Permalink
Work on federation
Browse files Browse the repository at this point in the history
  • Loading branch information
CPlusPatch committed Nov 27, 2023
1 parent b27d421 commit 8b442c2
Show file tree
Hide file tree
Showing 8 changed files with 254 additions and 2 deletions.
Binary file modified bun.lockb
Binary file not shown.
8 changes: 7 additions & 1 deletion config/config.example.toml
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
[database]
host = "localhost"
port = 48654
port = 5432
username = "lysand"
password = "password123"
database = "lysand"

[redis.queue]
host = "localhost"
post = 6379
password = ""
database = 0

[http]
base_url = "https://lysand.social"
bind = "http://localhost"
Expand Down
12 changes: 11 additions & 1 deletion database/datasource.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Queue } from "bullmq";
import { getConfig } from "../utils/config";
import { PrismaClient } from "@prisma/client";

Expand All @@ -7,4 +8,13 @@ const client = new PrismaClient({
datasourceUrl: `postgresql://${config.database.username}:${config.database.password}@${config.database.host}:${config.database.port}/${config.database.database}`,
});

export { client };
const federationQueue = new Queue("federation", {
connection: {
host: config.redis.queue.host,
port: config.redis.queue.port,
password: config.redis.queue.password,
db: config.redis.queue.database,
},
});

export { client, federationQueue };
197 changes: 197 additions & 0 deletions database/entities/Queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
import { getConfig } from "@config";
import { Worker } from "bullmq";
import { client, federationQueue } from "~database/datasource";
import {
statusAndUserRelations,
statusToLysand,
type StatusWithRelations,
} from "./Status";
import type { User } from "@prisma/client";

const config = getConfig();

export const federationWorker = new Worker(
"federation",
async job => {
await job.updateProgress(0);

switch (job.name) {
case "federation": {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
const statusId = job.data.id as string;

const status = await client.status.findUnique({
where: { id: statusId },
include: statusAndUserRelations,
});

if (!status) return;

// Only get remote users that follow the author of the status, and the remote mentioned users
const peopleToSendTo = await client.user.findMany({
where: {
OR: [
["public", "unlisted", "private"].includes(
status.visibility
)
? {
relationships: {
some: {
subjectId: status.authorId,
following: true,
},
},
instanceId: {
not: null,
},
}
: {},
// Mentioned users
{
id: {
in: status.mentions.map(m => m.id),
},
instanceId: {
not: null,
},
},
],
},
});

let peopleDone = 0;

// Spawn sendToServer job for each user
for (const person of peopleToSendTo) {
await federationQueue.add("sendToServer", {
id: statusId,
user: person,
});

peopleDone++;

await job.updateProgress(
Math.round((peopleDone / peopleToSendTo.length) * 100)
);
}
break;
}
case "sendToServer": {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
const statusId = job.data.id as string;
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
const user = job.data.user as User;

const status = await client.status.findUnique({
where: { id: statusId },
include: statusAndUserRelations,
});

if (!status) return;

const response = await federateStatusTo(
status,
status.author,
user
);

if (response.status !== 200) {
throw new Error(
`Federation error: ${response.status} ${response.statusText}`
);
}

break;
}
}

await job.updateProgress(100);

return true;
},
{
connection: {
host: config.redis.queue.host,
port: config.redis.queue.port,
password: config.redis.queue.password,
db: config.redis.queue.database,
},
removeOnComplete: {
count: 400,
},
removeOnFail: {
count: 3000,
},
}
);

/**
* Convert a string into an ArrayBuffer
* from https://developers.google.com/web/updates/2012/06/How-to-convert-ArrayBuffer-to-and-from-String
*/
export const str2ab = (str: string) => {
const buf = new ArrayBuffer(str.length);
const bufView = new Uint8Array(buf);
for (let i = 0, strLen = str.length; i < strLen; i++) {
bufView[i] = str.charCodeAt(i);
}
return buf;
};

export const federateStatusTo = async (
status: StatusWithRelations,
sender: User,
user: User
) => {
const privateKey = await crypto.subtle.importKey(
"pkcs8",
str2ab(atob(user.privateKey ?? "")),
"Ed25519",
false,
["sign"]
);

const digest = await crypto.subtle.digest(
"SHA-256",
new TextEncoder().encode("request_body")
);

// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
const userInbox = new URL((user.endpoints as any).inbox);

const date = new Date();

const signature = await crypto.subtle.sign(
"Ed25519",
privateKey,
new TextEncoder().encode(
`(request-target): post ${userInbox.pathname}\n` +
`host: ${userInbox.host}\n` +
`date: ${date.toUTCString()}\n` +
`digest: SHA-256=${btoa(
String.fromCharCode(...new Uint8Array(digest))
)}\n`
)
);

const signatureBase64 = btoa(
String.fromCharCode(...new Uint8Array(signature))
);

return fetch(userInbox, {
method: "POST",
headers: {
"Content-Type": "application/json",
Date: date.toUTCString(),
Origin: config.http.base_url,
Signature: `keyId="${sender.uri}",algorithm="ed25519",headers="(request-target) host date digest",signature="${signatureBase64}"`,
},
body: JSON.stringify(statusToLysand(status)),
});
};

export const addStatusFederationJob = async (statusId: string) => {
await federationQueue.add("federation", {
id: statusId,
});
};
1 change: 1 addition & 0 deletions database/entities/Status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ export const statusToAPI = async (
id: status.id,
in_reply_to_id: status.inReplyToPostId || null,
in_reply_to_account_id: status.inReplyToPost?.authorId || null,
// @ts-expect-error Prisma TypeScript types dont include relations
account: userToAPI(status.author),
created_at: new Date(status.createdAt).toISOString(),
application: status.application
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
"@aws-sdk/client-s3": "^3.429.0",
"@prisma/client": "^5.6.0",
"blurhash": "^2.0.5",
"bullmq": "^4.14.2",
"chalk": "^5.3.0",
"eventemitter3": "^5.0.1",
"html-to-text": "^9.0.5",
Expand Down
20 changes: 20 additions & 0 deletions prisma/migrations/20231127010521_add_notifications/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- CreateTable
CREATE TABLE "Notification" (
"id" UUID NOT NULL DEFAULT uuid_generate_v7(),
"type" TEXT NOT NULL,
"createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"notifiedId" UUID NOT NULL,
"accountId" UUID NOT NULL,
"statusId" UUID,

CONSTRAINT "Notification_pkey" PRIMARY KEY ("id")
);

-- AddForeignKey
ALTER TABLE "Notification" ADD CONSTRAINT "Notification_notifiedId_fkey" FOREIGN KEY ("notifiedId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "Notification" ADD CONSTRAINT "Notification_accountId_fkey" FOREIGN KEY ("accountId") REFERENCES "User"("id") ON DELETE CASCADE ON UPDATE CASCADE;

-- AddForeignKey
ALTER TABLE "Notification" ADD CONSTRAINT "Notification_statusId_fkey" FOREIGN KEY ("statusId") REFERENCES "Status"("id") ON DELETE CASCADE ON UPDATE CASCADE;
17 changes: 17 additions & 0 deletions utils/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,15 @@ export interface ConfigType {
database: string;
};

redis: {
queue: {
host: string;
port: number;
password: string;
database: number;
};
};

http: {
base_url: string;
bind: string;
Expand Down Expand Up @@ -145,6 +154,14 @@ export const configDefaults: ConfigType = {
password: "postgres",
database: "lysand",
},
redis: {
queue: {
host: "localhost",
port: 6379,
password: "",
database: 0,
},
},
instance: {
banner: "",
description: "",
Expand Down

0 comments on commit 8b442c2

Please sign in to comment.