Skip to content

Commit

Permalink
initialize repo (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
red2n authored Dec 6, 2024
2 parents 31d898c + 892195b commit 4ab45b3
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 19 deletions.
8 changes: 6 additions & 2 deletions apps/orbitHub/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ import { AppServer } from './appServer.js';
import { registerRoute } from './appServerRoute.js';
import { routes } from './routes/register.js';
import { getConnectionString } from './utils.js';
import type { Consumer, Producer } from 'kafkajs';

const SERVICE_NAME = 'App';
const DEFAULT_PORT = '8080';
const DB_COLLECTION_NAME = 'rGuestStay';
const SHUTTING_DOWN_MESSAGE = 'Shutting down gracefully...';
const SERVER_CLOSED_MESSAGE = 'Server closed';
const FIRST_DOCUMENT_MESSAGE = 'First document in rGuestStay collection:';

const consumers: Consumer[] = [];
const producers: Producer[] = [];
dotenv.config();
const PORT: number = Number.parseInt(process.env.PORT || DEFAULT_PORT);
const app: FastifyInstance = AppLogger.getLogger(SERVICE_NAME);
Expand Down Expand Up @@ -48,6 +50,7 @@ const registerHttpRoutes = async () => {
const kafkaInstance = route as KafkaInAsync<any, any, any, any>;
KafkaUtils.initializeConsumer(KafkaEssentials.kafka, KafkaEssentials.kafkaConfig, kafkaInstance.topic)
.then((consumer) => {
consumers.push(consumer);
app.log.info(`Consumer is listening on topic: ${kafkaInstance.topic}`);
consumer.run({
eachMessage: async ({ topic, message }) => {
Expand All @@ -67,6 +70,7 @@ const registerHttpRoutes = async () => {
});

KafkaUtils.initializeProducer(KafkaEssentials.kafka).then((producer) => {
producers.push(producer);
kafkaInstance.produce = async (data: any) => {
try {
await producer.send({
Expand Down Expand Up @@ -140,7 +144,7 @@ initialize();
process.on('SIGINT', async () => {
app.log.info(SHUTTING_DOWN_MESSAGE);
try {
await await KafkaUtils.disconnectFromKafka(KafkaEssentials.kafka, KafkaEssentials.kafkaConfig, app);
await KafkaUtils.disconnectFromKafka(KafkaEssentials.kafka,consumers, producers, app);
await app.close();
app.log.info(SERVER_CLOSED_MESSAGE);
process.exit(0);
Expand Down
5 changes: 5 additions & 0 deletions apps/orbitHub/src/appServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ export class AppServer {
done();
});

app.addHook('onClose', (instance, done) => {
app.log.info("Unregistering all routes");
done();
});

const checkIdleTime = () => {
const currentTime = Date.now();
const idleTime = currentTime - lastActivityTime;
Expand Down
35 changes: 18 additions & 17 deletions modules/starter/src/kafka/kafkaUtils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Kafka, Consumer, Producer } from 'kafkajs';
import { type Kafka, type Consumer, type Producer, Partitioners } from 'kafkajs';
import type { KafkaConfig } from './kafkaConfig.js';
import type { FastifyInstance } from 'fastify/types/instance.js';

Expand Down Expand Up @@ -34,7 +34,7 @@ export class KafkaUtils {
* @returns {Promise<Producer>} A promise that resolves to the connected Kafka producer.
*/
static async initializeProducer(kafka: Kafka): Promise<Producer> {
const producer = kafka.producer();
const producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner });
await producer.connect();
return producer;
}
Expand All @@ -50,23 +50,24 @@ export class KafkaUtils {
*
* @throws Will log an error if there is an issue disconnecting the consumer or producer.
*/
static async disconnectFromKafka(kafka: Kafka, kafkaConfig: KafkaConfig, app: FastifyInstance) {
const consumer = kafka.consumer({ groupId: kafkaConfig.groupId });
const producer = kafka.producer();

if (consumer) {
await consumer.disconnect().then(() => {
app.log.info("Consumer disconnected from Kafka");
}).catch((err) => {
app.log.error("Error disconnecting consumer from Kafka", err);
});
}
if (producer) {
await producer.disconnect().then(() => {
app.log.info("Producer disconnected from Kafka");
static async disconnectFromKafka(kafka: Kafka, consumers: Consumer[], producers: Producer[], app: FastifyInstance) {
try {
app.log.info('Disconnecting consumers and producers...');
await Promise.all(consumers.map(consumer => consumer.disconnect().then(() => {
app.log.info('Successfully disconnected all consumers from Kafka.');
})));
await Promise.all(producers.map(producer => producer.disconnect().then(() => {
app.log.info('Successfully disconnected all producers from Kafka.');
})));
app.log.info('Disconnecting from Kafka...');
await kafka.admin().disconnect().then(() => {
app.log.info('Successfully disconnected from Kafka.');
}).catch((err) => {
app.log.error("Error disconnecting producer from Kafka", err);
app.log.error('Error disconnecting from Kafka:', err);
});

} catch (err) {
app.log.error('Error disconnecting from Kafka:', err);
}
}
}

0 comments on commit 4ab45b3

Please sign in to comment.