io
is the WebSocket server itself. It manages all client connections and allows broadcasting messages to all connected users.
Think of io
as a chat room manager who handles all users joining, leaving, and sending messages.
When a new client connects, io
detects it and gives them a unique socket (ID).
this.io.on("connection", (socket) => {
// Handle new connection
});
This means: 👉 "When someone joins (connects), give them a socket to communicate."
socket
represents a single connected user. It allows us to send/receive messages between that specific user and the server.
Every time a user connects, io
assigns them a socket.
This socket is unique to that user, like a personal walkie-talkie.
socket.on("message", (data) => {
console.log("Received message:", data);
socket.broadcast.emit("message", data);
});
This means: 👉 "When this specific user sends a message, share it with all other users except them."
io
(WebSocket Server): Manages all clients.socket
(Single Connection): Represents one specific user. `
- Install dependencies:
npm install express socket.io
- Create a singleton
WebSocketServer
class. - Ensure the same instance is used across the app.
- Handle client connections and events.
npm install express socket.io
Create a new file WebSocketServer.ts
:
import { Server as HttpServer } from "http";
import { Server as SocketIOServer, Socket } from "socket.io";
class WebSocketServer {
private static instance: WebSocketServer;
private io: SocketIOServer;
private constructor(server: HttpServer) {
this.io = new SocketIOServer(server, {
cors: {
origin: "*", // Adjust based on your needs
},
});
this.setupListeners();
}
public static initialize(server: HttpServer): WebSocketServer {
if (!WebSocketServer.instance) {
WebSocketServer.instance = new WebSocketServer(server);
}
return WebSocketServer.instance;
}
public static getInstance(): WebSocketServer {
if (!WebSocketServer.instance) {
throw new Error("WebSocketServer not initialized. Call initialize(server) first.");
}
return WebSocketServer.instance;
}
private setupListeners() {
this.io.on("connection", (socket: Socket) => {
console.log(`Client connected: ${socket.id}`);
socket.on("event:message", (data) => { // SERVER listens when user/client clicks "SEND" button
console.log("Received message:", data);
// publish to pubsub
socket.broadcast.emit("message", data); // Broadcast to all clients, except self
});
socket.on("disconnect", () => {
console.log(`Client disconnected: ${socket.id}`);
});
});
}
public getIO(): SocketIOServer {
return this.io;
}
}
export default WebSocketServer;
Modify your server.ts
or index.ts
:
import express from "express";
import http from "http";
import WebSocketServer from "./WebSocketServer"; // Import the singleton class
const app = express();
const server = http.createServer(app);
// Initialize WebSocket server
WebSocketServer.initialize(server);
app.get("/", (req, res) => {
res.send("WebSocket Server is Running...");
});
const PORT = 5000;
server.listen(PORT, () => {
console.log(`Server running on http://localhost:${PORT}`);
});
If you're using a frontend (React, Vanilla JS, etc.), connect using:
import { io } from "socket.io-client";
const socket = io("http://localhost:5000");
socket.on("connect", () => {
console.log("Connected to WebSocket Server");
});
socket.emit("message:event", "Hello from client!");
socket.on("message", (data) => { // this is listening to the events emited by server
console.log("Received:", data);
});
- Ensures only one WebSocket server instance across the app.
- Prevents multiple WebSocket instances from being created by mistake.
- Allows easy access via
getInstance()
from other parts of the app.
This approach keeps WebSocket handling modular, scalable, and maintainable. 🚀
"use client";
import React, { useCallback, useContext, useEffect, useState } from "react";
import { io, Socket } from "socket.io-client";
interface SocketProviderProps {
children?: React.ReactNode;
}
interface ISocketContext {
sendMessage: (msg: string) => void;
messages: string[];
socketId: string | null;
}
const SocketContext = React.createContext<ISocketContext | null>(null);
export const useSocket = () => {
const state = useContext(SocketContext);
if (!state) throw new Error("SocketContext is undefined");
return state;
};
export const SocketProvider: React.FC<SocketProviderProps> = ({ children }) => {
const [socket, setSocket] = useState<Socket | null>(null);
const [messages, setMessages] = useState<string[]>([]);
const [socketId, setSocketId] = useState<string | null>(null);
// Function to send message
const sendMessage: ISocketContext["sendMessage"] = useCallback(
(msg) => {
console.log("Sending message:", msg);
if (socket) {
socket.emit("message:event", msg);
}
},
[socket]
);
// Function to handle incoming messages
const onMessageReceived = useCallback((data: {message : string}) => {
console.log("Received from server:", data);
setMessages((prev) => [...prev, data.message]);
}, []);
useEffect(() => {
// Connect to the WebSocket server
const _socket = io("http://localhost:8000");
_socket.on("connect", () => {
console.log("Connected with socket ID:", _socket.id);
setSocketId(_socket.id);
});
_socket.on("message:rec", onMessageReceived);
setSocket(_socket);
return () => {
_socket.off("message", onMessageReceived);
_socket.disconnect();
setSocket(null);
setSocketId(null);
};
}, []);
return (
<SocketContext.Provider value={{ sendMessage, messages, socketId }}>
{children}
</SocketContext.Provider>
);
};
import { useSocket } from "./SocketProvider";
const ChatComponent = () => {
const { sendMessage, messages, socketId } = useSocket();
return (
<div>
<h2>Socket ID: {socketId}</h2>
<button onClick={() => sendMessage("Hello Server!")}>Send</button>
<ul>
{messages.map((msg, idx) => (
<li key={idx}>{msg}</li>
))}
</ul>
</div>
);
};
export default ChatComponent;
import { Server as HttpServer } from "http";
import { Server as SocketIOServer, Socket } from "socket.io";
import Redis from "ioredis";
const pub = new Redis({
host: "",
port: 0,
username: "default",
password: "",
});
const sub = new Redis({
host: "",
port: 0,
username: "",
password: "",
});
class WebSocketServer {
private static instance: WebSocketServer;
private io: SocketIOServer;
private constructor(server: HttpServer) {
this.io = new SocketIOServer(server, {
cors: {
origin: "*", // Adjust based on your needs
},
});
this.setupListeners();
sub.subscribe("MESSAGES");
}
public static initialize(server: HttpServer): WebSocketServer {
if (!WebSocketServer.instance) {
WebSocketServer.instance = new WebSocketServer(server);
}
return WebSocketServer.instance;
}
public static getInstance(): WebSocketServer {
if (!WebSocketServer.instance) {
throw new Error("WebSocketServer not initialized. Call initialize(server) first.");
}
return WebSocketServer.instance;
}
private setupListeners() {
this.io.on("connection", (socket: Socket) => {
console.log(`Client connected: ${socket.id}`);
socket.on("event:message", ({message} : {message: string}) => {
console.log("Received message:", message);
await pub.publish("MESSAGES", JSON.stringify({ message }));
await produceMessage(message); // producing in kafka (ignore for now)
});
sub.on("message", async (channel, message) => {
if (channel === "MESSAGES") {
console.log("new message from redis", message);
const parsedMessage = JSON.parse(message); // Convert string to object
io.emit("message:rec", parsedMessage);
}
});
socket.on("disconnect", () => {
console.log(`Client disconnected: ${socket.id}`);
});
});
}
public getIO(): SocketIOServer {
return this.io;
}
}
export default WebSocketServer;
- GET postgress connection string from neonDB.
- Setup Prisma in your Project.
- Write
schema.prisma
and runnpx prisma migrate dev --name init && npx prisma generate
- GET kafka URL.
npm i kafkajs
kafka.ts
: HERE WE ARE
import { Kafka, Producer } from "kafkajs";
import fs from "fs";
import path from "path";
import prismaClient from "./prisma";
const kafka = new Kafka({
brokers: [""],
ssl: {
ca: [fs.readFileSync(path.resolve("./ca.pem"), "utf-8")],
},
sasl: {
username: "",
password: "",
mechanism: "",
},
});
let producer: null | Producer = null;
export async function createProducer() {
if (producer) return producer;
const _producer = kafka.producer();
await _producer.connect();
producer = _producer;
return producer;
}
export async function produceMessage(message: string) { // USE THIS WHILE PUBLISHING TO PUBSUB
const producer = await createProducer();
await producer.send({
messages: [{ key: `message-${Date.now()}`, value: message }],
topic: "MESSAGES",
});
return true;
}
export async function startMessageConsumer() { // RUN THIS FUNCTION AT INDEX.JS
console.log("Consumer is running..");
const consumer = kafka.consumer({ groupId: "default" });
await consumer.connect();
await consumer.subscribe({ topic: "MESSAGES", fromBeginning: true });
await consumer.run({
autoCommit: true,
eachMessage: async ({ message, pause }) => {
if (!message.value) return;
console.log(`New Message Recv..`);
try {
await prismaClient.message.create({
data: {
text: message.value?.toString(),
},
});
} catch (err) {
console.log("Something is wrong");
pause();
setTimeout(() => {
consumer.resume([{ topic: "MESSAGES" }]);
}, 60 * 1000);
}
},
});
}
export default kafka;
- Modify your consumer to collect messages in an array and insert them in bulk:
export async function startMessageConsumer() {
console.log("Consumer is running..");
const consumer = kafka.consumer({ groupId: "default" });
await consumer.connect();
await consumer.subscribe({ topic: "MESSAGES", fromBeginning: true });
let messagesBuffer: { text: string }[] = [];
const BATCH_SIZE = 10; // Number of messages before inserting into DB
const BATCH_INTERVAL = 5000; // 5 seconds
setInterval(async () => {
if (messagesBuffer.length > 0) {
try {
console.log(`Inserting ${messagesBuffer.length} messages into DB..`);
await prismaClient.message.createMany({
data: messagesBuffer,
skipDuplicates: true, // Avoid duplicate inserts
});
messagesBuffer = []; // Clear buffer after insert
} catch (err) {
console.error("DB Insert Error", err);
}
}
}, BATCH_INTERVAL);
await consumer.run({
autoCommit: true,
eachMessage: async ({ message }) => {
if (!message.value) return;
console.log(`Buffered message: ${message.value.toString()}`);
messagesBuffer.push({ text: message.value.toString() });
if (messagesBuffer.length >= BATCH_SIZE) {
try {
console.log(`Inserting ${BATCH_SIZE} messages into DB..`);
await prismaClient.message.createMany({
data: messagesBuffer,
skipDuplicates: true,
});
messagesBuffer = [];
} catch (err) {
console.error("DB Insert Error", err);
}
}
},
});
}