Skip to content

Commit

Permalink
test rabbitmq email service
Browse files Browse the repository at this point in the history
  • Loading branch information
Nguyen-Duc-Khai committed Dec 9, 2023
1 parent 06e5d9b commit ced1d06
Show file tree
Hide file tree
Showing 11 changed files with 1,486 additions and 1,320 deletions.
11 changes: 9 additions & 2 deletions backend/.env.example
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
PORT=3000
NODE_ENV=development
RABBITMQ=localhost

DB_HOST=localhost
DB_PORT=3306
Expand All @@ -18,4 +17,12 @@ MQTT_USER=emqx
MQTT_PASSWORD=public

VAPID_PUBLIC_KEY=<VAPID_PUBLIC_KEY>
VAPID_PRIVATE_KEY=<VAPID_PRIVATE_KEY>
VAPID_PRIVATE_KEY=<VAPID_PRIVATE_KEY>

AQI_API_TOKEN=4ae4ef041a1ee5684232e273986a11d9983b5093

PREDICTION_SERVICE_URL=http://localhost:8081

RABBITMQ=amqp://admin:admin@localhost:5672
RABBITMQ_MAILSERVICE_EXCHANGE_NAME=EmailExchange
RABBITMQ_MAILSERVICE_QUEUE_NAME=MailServiceQueue
8 changes: 8 additions & 0 deletions backend/src/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ const { errorHandler } = require("./middlewares/errors");
const ApiError = require("./utils/ApiError");
const httpStatus = require("http-status");

const Producer = require("./rabbitmq/producer");
const producer = new Producer();

const start = async (agenda) => {
const app = express();

Expand Down Expand Up @@ -43,6 +46,11 @@ const start = async (agenda) => {
const apiRoutes = (app, io, mqtt) => {
app.use("/v1", v1Route);

app.post("/amqp", async (req, res, next) => {
await producer.publishEmailMessage(req.body.mailList);
res.send("Sent");
});

app.get("/", async (req, res) => {
res.set("Content-Type", "text/html");
res.send(
Expand Down
8 changes: 8 additions & 0 deletions backend/src/config/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ const envVarsSchema = Joi.object()
VAPID_PRIVATE_KEY: Joi.string(),
AQI_API_TOKEN: Joi.string(),
PREDICTION_SERVICE_URL: Joi.string(),
RABBITMQ: Joi.string(),
RABBITMQ_MAILSERVICE_EXCHANGE_NAME: Joi.string(),
RABBITMQ_MAILSERVICE_QUEUE_NAME: Joi.string(),
})
.unknown();

Expand Down Expand Up @@ -62,4 +65,9 @@ module.exports = {
vapidPrivateKey: envVars.VAPID_PRIVATE_KEY,
aqiApiToken: envVars.AQI_API_TOKEN,
predictionServiceUrl: envVars.PREDICTION_SERVICE_URL,
rabbitmq: {
url: envVars.RABBITMQ,
mailServiceExchangeName: envVars.RABBITMQ_MAILSERVICE_EXCHANGE_NAME,
mailServiceQueueName: envVars.RABBITMQ_MAILSERVICE_QUEUE_NAME,
},
};
39 changes: 39 additions & 0 deletions backend/src/rabbitmq/producer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
const amqp = require("amqplib");
const config = require("../config/config");
const logger = require("../config/logger");

class Producer {
channel;

async createChannel() {
const connection = await amqp.connect(config.rabbitmq.url);
this.channel = await connection.createChannel();
}

async publishEmailMessage(message, routingKey = "MailService") {
if (!this.channel) {
await this.createChannel();
}

const mailServiceExchangeName = config.rabbitmq.mailServiceExchangeName;
await this.channel.assertExchange(mailServiceExchangeName, "direct");

const logDetails = {
logType: routingKey,
message: message,
dateTime: new Date(),
};

await this.channel.publish(
mailServiceExchangeName,
routingKey,
Buffer.from(JSON.stringify(logDetails))
);

logger.info(
`The new ${routingKey} is sent to exchange ${mailServiceExchangeName}`
);
}
}

module.exports = Producer;
1 change: 0 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ services:
- "15672:15672"
volumes:
- ./airchecker/rabbitmq:/var/lib/rabbitmq
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
networks:
- node-network

Expand Down
39 changes: 35 additions & 4 deletions email-service/app.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,38 @@
const config = require("./config.json");
const logger = require("./logger");
const amqp = require("amqplib");

const sendMail = require("./transporter");

const mailList = ["nguyenduckhai8101@gmail.com", "19521658@gm.uit.edu.vn"];
async function consumeMessages() {
const connection = await amqp.connect(config.amqp.url);
const channel = await connection.createChannel();

await channel.assertExchange(config.amqp.exchangeName, "direct");

const q = await channel.assertQueue(config.amqp.queueName);

await channel.bindQueue(
q.queue,
config.amqp.exchangeName,
config.amqp.bindingKey
);

channel.consume(q.queue, async (msg) => {
const data = JSON.parse(msg.content);
logger.info(
`Receive message from exchange [${
config.amqp.exchangeName
}], data ${JSON.stringify(data)}`
);

const { message: mailList } = data;
console.log(mailList);

await sendMail(mailList);

channel.ack(msg);
});
}

(async () => {
await sendMail(mailList);
})();
consumeMessages();
8 changes: 6 additions & 2 deletions email-service/config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
{
"amqp": "amqp://localhost",
"queue": "nodemailer-amqp",
"amqp": {
"exchangeName": "EmailExchange",
"queueName": "MailServiceQueue",
"bindingKey": "MailService",
"url": "amqp://admin:admin@localhost:5672"
},

"smtp_server": {
"port": 465,
Expand Down
2 changes: 1 addition & 1 deletion email-service/logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const logger = winston.createLogger({
winston.format.errors({ stack: true }),
winston.format.json()
),
defaultMeta: { application: "mail_service" },
defaultMeta: { application: "email_service" },
transports: [
new winston.transports.Console({
format: winston.format.simple(),
Expand Down
Loading

0 comments on commit ced1d06

Please sign in to comment.