-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcluster.js
223 lines (200 loc) · 8.37 KB
/
cluster.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
var cluster = require('cluster');
var Debug = require('console-debug');
const configReader = require('./modules/config-reader.js');
const config = configReader('./config.json');
var console = new Debug({
// Do we want to catch uncaughtExceptions?
uncaughtExceptionCatch: false,
// Filter these console output types
consoleFilter: config.debug? [] : ['LOG', 'WARN', 'DEBUG', 'INFO'],
// do we want pretty pony colors in our console output?
colors: true
});
const Client = require('node-rest-client').Client;
const ServerManager = require('./modules/server-manager.js');
const ServerSupervisor = require('./modules/server-supervisor.js');
const express = require('express');
var redis = require('redis');
var redis_client = redis.createClient();
//cluster master
if (cluster.isMaster) {
const numCPUs = require('os').cpus().length;
console.info(`Starting up cluster with ${numCPUs} processes`);
//creamos tantos hilos como cpus tengamos disponibles
for (var i = 0; i < numCPUs; i++) {
var worker = cluster.fork();
}
//definimos qué hacer una vez que se levanta cada worker
cluster.on('online', (worker) => {
console.info(`Worker ${worker.process.pid} is online`);
});
//definimos qué pasa cuando un worker se cae
cluster.on('exit', (worker, code, signal) => {
console.error(`Worker ${worker.process.pid} died`);
console.info('Starting new worker...');
cluster.fork();
});
//se reciben los mensajes de los workers
cluster.on('message', (worker, message, handle) => {
for (var x in cluster.workers){
if(cluster.workers[x].process.pid != message.pid){
cluster.workers[x].send({ message: message.message, host: message.host} );
}
}
});
}
//cluster workers
else{
const srvMan = new ServerManager(config.serverList, config.serverExclusionTime, config.requests);
var app = express();
const numberOfRetries = 3;
//todos los requests entrantes
app.all("*", (req, res) => {
console.info(`Recibí un request ${req.method} de ${req.ip}`);
console.debug(req.url);
getFromCache(req, res, ()=>{
//si no lo puede obtener de la cache
//determino la cantidad de retries disponibles según el tipo de request
var maxNumberOfRetries = config.maxRetryCount;
if (req.method === "POST" || req.method === "PUT" || req.method === "DELETE")
maxNumberOfRetries = 0;
//procedo a hacer el request
makeRequest(srvMan, req, res, process, maxNumberOfRetries);
});
});
//levanto el servidor
app.listen(config.listenPort, () => {
console.info(`Se levanta el load balancer`);
console.info(`[${process.pid}] Escuchando en ${config.listenPort}`);
}).on('error', function(err) {
if (err.errno === 'EADDRINUSE')
console.debug('Puerto ocupado '+config.listenPort);
else
console.debug(err);
});
process.on("message", (message) => {
console.info(`[${process.pid}] server ${message.host} offline`);
console.debug(message);
srvMan.setServerOffline(message.host);
});
//se agrega la ruta para registrar el heartbeat
app.route('/register').get(function (req, res) {
console.info('se registró el heartbeat');
new ServerSupervisor(srvMan, config);
res.json({status: 'heartbeat ok'})
});
}
/**
* Intenta obtener la información de caché en caso de ser necesario
* @param {Express.Request} req
* @param {Express.Response} res
* @param {function} callback La función a ser ejecutada en caso de un cache miss o cuando no se usa cache
*/
function getFromCache(req, res, callback) {
//si el request no puede usar cache
if(!canUseCache(req)) {
callback(); //ejecuto el callback
return;
}
//intento obtener los datos del caché
redis_client.get(getCacheKey(req), (err, reply) => {
if(reply !== null)
res.send(reply); //cache hit! respondo al cliente
else //cache miss. ejecuto el callback (hacer el request al server destino)
callback();
});
}
/**
* Determina si corresponde o no usar caché
* @param {Express.Request} req
*/
function canUseCache(req){
// console.debug(req.headers);
// console.debug(req.header('cache-control'));
// console.debug(req.header('expires'));
var forbiddenByCacheControl = req.header('cache-control') !== undefined && req.header('cache-control') === "no-cache";
var forbiddenByExpiration = req.header('expires') !== undefined && req.header('expires') == 0;
// console.debug(`cc ${forbiddenByCacheControl} - exp ${forbiddenByExpiration}`);
return req.method === "GET" && !forbiddenByCacheControl && !forbiddenByExpiration;
}
/**
* me devuelve el nombre con el que voy a guardar los responses en el cache
* @param {Express.Request} req el objeto del request con el que estamos trabajando
* @return {string} sum
*/
function getCacheKey(req){
//sabemos que la url puede ser muy larga y que una key larga para redis puede no ser lo mejor
// pero lo hacemos así igual porque es la forma más sencilla de identificar requests
// tabién se podría generar un hash, pero requiere más procesamiento
return req.method + "_" + req.url;
}
/**
* Realiza el request a un servidor disponible
* @param {ServerManager} srvMan
* @param {Express.Request} req
* @param {Express.Response} res
* @param {Process} process
* @param {number} retries la cantidad de reintentos disponibles
*/
function makeRequest(srvMan, req, res, process, retries){
var client = new Client();
var theHost;
//le pido un servidor al manager
theHost = srvMan.getServer(req.url);
console.info(`[${process.pid}] Se va a hacer un request al servidor ${theHost}`);
if(theHost == null) {
console.error(`[${process.pid}] no available servers`);
res.sendStatus(502);
return;
}
//llamo a algún servidor externo
var r_req = client.get(theHost+req.url, { requestConfig: { timeout: 2000 }, responseConfig: { timeout: 1000 } }, (data, response) => {
//parseo la respuesta
if(Buffer.isBuffer(data)){
data = data.toString('utf8');
}
//respondo al cliente
res.send(`[${process.pid}] - ${data}`);
if(canUseCache(req)){
//guardo en caché
redis_client.setex(getCacheKey(req), config.cacheTimeout, `[${process.pid}] - ${data}`);
}
});
//atiendo el evento de error del request al servidor destino
r_req.on('error', (err) => {
console.error(`[${process.pid}] request error`);
//pregunto si el header ya se envío porque los eventos de error y requestTimeout se pueden lanzar en simultáneo
if(!res.headerSent) handleErrorFromRemoteServer(theHost, srvMan, req, res, process, retries);
});
//atiendo el evento de timeout del request al servidor destino
r_req.on('requestTimeout', function (req) {
console.error(`[${process.pid}] request has expired`);
//pregunto si el header ya se envío porque los eventos de error y requestTimeout se pueden lanzar en simultáneo
if(!res.headerSent) handleErrorFromRemoteServer(theHost, srvMan, req, res, process, retries);
});
}
/**
* Es la función que se llama en caso de fallo
* @param {string} theHost
* @param {ServerManager} srvMan
* @param {Express.Request} req
* @param {Express.Response} res
* @param {Process} process
* @param {number} retries la cantidad de reintentos disponibles
*/
function handleErrorFromRemoteServer(theHost, srvMan, req, res, process, retries){
srvMan.setServerOffline(theHost);
//le aviso al master que theHost no reponde para que le avise a los demás workers
process.send({
pid: process.pid,
msg: 'serverDown',
host: theHost
});
if(retries > 0){
console.log(`[${process.pid}] remote server error. Remaining attemps ${--retries}`);
makeRequest(srvMan, req, res, process, (retries));
}else{
console.error(`[${process.pid}] can't process request`);
res.sendStatus(503);
}
}