Skip to content

Commit

Permalink
feat: implement server using vanilla http
Browse files Browse the repository at this point in the history
  • Loading branch information
Deivu committed Dec 22, 2024
1 parent 0b69141 commit fb89832
Showing 1 changed file with 44 additions and 36 deletions.
80 changes: 44 additions & 36 deletions src/concurrency/ConcurrencyServer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import Fastify, { FastifyInstance, FastifyReply, FastifyRequest } from 'fastify';
import { ConcurrencyManager } from './ConcurrencyManager';
import { AddressInfo } from 'node:net';
import { ConcurrencyManager } from './ConcurrencyManager';
import Http from 'node:http';

/**
* Server that handles identify locks
Expand All @@ -10,7 +10,7 @@ export class ConcurrencyServer {
* Fastify instance of this server
* @private
*/
private readonly fastify: FastifyInstance;
private readonly server: Http.Server;
/**
* Concurrency manager for this server
* @private
Expand All @@ -22,21 +22,9 @@ export class ConcurrencyServer {
*/
private readonly password: string;
constructor(concurrency: number) {
this.fastify = Fastify();
this.server = Http.createServer((req, res) => this.handle(req, res));
this.manager = new ConcurrencyManager(concurrency);
this.password = Math.random().toString(36).slice(2, 10);

this.fastify.route({
method: 'POST',
url: '/concurrency/acquire',
handler: (req, res) => this.handle(req, res)
});

this.fastify.route({
method: 'DELETE',
url: '/concurrency/cancel',
handler: (req, res) => this.handle(req, res)
})
}

/**
Expand All @@ -50,58 +38,78 @@ export class ConcurrencyServer {
* Gets the address info assigned for this instance
*/
public get info(): AddressInfo {
return this.fastify.addresses().filter(a => a.family === 'IPv4').shift()!;
return this.server.address as unknown as AddressInfo;
}

/**
* Handles the incoming requests
* @param request
* @param reply
* @param response
* @private
*/
private async handle(request: FastifyRequest, reply: FastifyReply): Promise<string> {
private async handle(request: Http.IncomingMessage, response: Http.ServerResponse): Promise<void> {
if (!request.url || request.method !== 'POST' && request.method !== 'DELETE') {
response.statusCode = 404;
response.statusMessage = 'Not Found';
return void response.end();
}

if (request.headers['authorization'] !== this.password) {
reply.code(401);
return 'Unauthorized';
response.statusCode = 401;
response.statusMessage = 'Unauthorized';
return void response.end();
}

// @ts-expect-error: this is ok
if (!request.query.hasOwnProperty('shardId')) {
reply.code(400);
return 'Missing shardId query string';
response.statusCode = 400;
response.statusMessage = 'Bad Request';
return void response.end('Missing shardId query string');
}

// @ts-expect-error: this is ok
const shardId = Number(request.query['shardId']);

if (isNaN(shardId)) {
reply.code(400);
return 'Expected shardId to be a number';
response.statusCode = 400;
response.statusMessage = 'Bad Request';
return void response.end('Expected shardId to be a number');
}

if (request.url.includes('/concurrency/cancel')) {

if (request.method === 'DELETE' && request.url.includes('/concurrency/cancel')) {
this.manager.abortIdentify(shardId);
response.statusCode = 200;
response.statusMessage = 'OK';
return void response.end();
}

} else {
if (request.method === 'DELETE' && request.url.includes('/concurrency/acquire')) {
try {
await this.manager.waitForIdentify(shardId);
response.statusCode = 204;
response.statusMessage = 'OK';
return void response.end();
} catch (error) {
reply.code(202);
return 'Acquire lock cancelled';
response.statusCode = 202;
response.statusMessage = 'Accepted';
return void response.end('Acquire lock cancelled');
}
}

reply.code(204);

return '';
response.statusCode = 404;
response.statusMessage = 'Not Found';
return void response.end();
}

/**
* Starts this server
*/
public async start(): Promise<AddressInfo> {
await this.fastify.listen();
return this.fastify.addresses().filter(a => a.family === 'IPv4').shift()!;
public start(): Promise<AddressInfo> {
return new Promise((resolve, reject) => {
this.server.listen((error: Error, addressInfo: AddressInfo) => {
if (error) return reject(error);
resolve(addressInfo);
})
})
}
}

0 comments on commit fb89832

Please sign in to comment.