In apps like Uber (for drivers) and Zomato (for delivery agents), one of the biggest technical challenges is real-time tracking. This means:
- Customers should see the exact location of their driver/delivery agent.
- The app should update instantly when the driver moves.
- The restaurant or driver should also get live updates about order pickup and drop-off.
If this tracking is delayed, inaccurate, or slow, it leads to:
- ❌ A bad customer experience.
- ❌ Drivers missing important updates.
- ❌ Inefficient route optimization.
- Every driver/delivery person sends new GPS coordinates every few seconds.
- If 100,000 drivers are online, that’s 100,000 location updates per second (6 million updates per minute)!
- Location data must be processed and displayed instantly.
- Traditional databases (like PostgreSQL or MySQL) are too slow for this.
- They can't handle so many operations(Read and Write) per second.
- Customers need to see the driver’s live location.
- Logistics team needs real-time route optimization.
- Restaurant needs to know when the driver is arriving.
- Notification system needs to send alerts (e.g., "Your delivery agent is near").
A slow or inefficient system can cause major issues like:
- A driver has reached the destination, but the app still shows them far away.
- A delivery agent picks up the food, but the restaurant doesn’t get an update.
- The customer doesn’t get notifications, leading to confusion.
- Kafka has ⚡high throughput but 🔻temporary storage.
- Query not possible in kafka.
This architecture ensures real-time rider location updates using WebSockets, Kafka, and Consumer Groups.
- The driver's mobile app continuously sends location updates via WebSocket.
- The updates contain:
{
"driverId": 453,
"longitude": 1324234.234,
"latitude": 4524234.234,
"state": "south"
}
- The Backend (BE) maintains an open WebSocket connection with the driver.
- The backend Kafka Producer processes incoming location updates.
- The producer publishes messages to the Kafka topic "rider_updates".
- Partitioning Logic:
- If
state === "south"
, the message is sent to Partition 0 (p0). - Otherwise, it is sent to Partition 1 (p1).
- If
- Kafka stores messages in a partitioned topic (rider_updates).
- Partitions ensure parallel processing and high throughput.
- Kafka guarantees ordering within each partition.
- Consumers subscribe to the "rider_updates" topic.
- Consumers read from partitions:
- One consumer can read multiple partitions.
- Multiple consumers in a group do NOT share partitions.
- Possible consumers include:
- Start Zookeper Container and expose PORT
2181
.
docker run -p 2181:2181 zookeeper
- Start Kafka Container, expose PORT
9092
and setup ENV variables.
docker run -p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=<PRIVATE_IP>:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<PRIVATE_IP>:9092 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
confluentinc/cp-kafka
client.js
const { Kafka } = require("kafkajs");
exports.kafka = new Kafka({
clientId: "my-app",
brokers: ["<PRIVATE_IP>:9092"],
});
admin.js
const { kafka } = require("./client");
async function init() {
const admin = kafka.admin();
console.log("Admin connecting...");
admin.connect();
console.log("Adming Connection Success...");
console.log("Creating Topic [rider-updates]");
await admin.createTopics({
topics: [
{
topic: "rider-updates",
numPartitions: 2,
},
],
});
console.log("Topic Created Success [rider-updates]");
console.log("Disconnecting Admin..");
await admin.disconnect();
}
init();
producer.js
const { kafka } = require("./client");
const readline = require("readline");
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
async function init() {
const producer = kafka.producer();
console.log("Connecting Producer");
await producer.connect();
console.log("Producer Connected Successfully");
rl.setPrompt("> ");
rl.prompt();
rl.on("line", async function (line) {
const [riderName, location] = line.split(" ");
await producer.send({
topic: "rider-updates",
messages: [
{
partition: location.toLowerCase() === "north" ? 0 : 1,
key: "location-update",
value: JSON.stringify({ name: riderName, location }),
},
],
});
}).on("close", async () => {
await producer.disconnect();
});
}
init();
consumer.js
const { kafka } = require("./client");
const group = process.argv[2];
async function init() {
const consumer = kafka.consumer({ groupId: group });
await consumer.connect();
await consumer.subscribe({ topics: ["rider-updates"], fromBeginning: true });
await consumer.run({
eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
console.log(
`${group}: [${topic}]: PART:${partition}:`,
message.value.toString()
);
},
});
}
init();
- Run Multiple Consumers
node consumer.js <GROUP_NAME>
- Create Producer
node producer.js
> tony south
> tony north
Instead of inserting data row by row, the consumer batches multiple Kafka messages and inserts them into the database at regular intervals (e.g., every 1 second or after collecting 1,000 records).
This minimizes the number of expensive DB writes.
const { Kafka } = require("kafkajs");
const { Pool } = require("pg");
const kafka = new Kafka({ clientId: "zomato-consumer", brokers: ["localhost:9092"] });
const consumer = kafka.consumer({ groupId: "location-consumers" });
const pool = new Pool({ connectionString: "postgres://user:password@localhost:5432/zomato" });
let buffer = []; // Stores messages temporarily
const BATCH_SIZE = 1000; // Adjust based on DB performance
const insertBatchToDB = async () => {
if (buffer.length === 0) return;
const values = buffer.map(l => `('${l.driver_id}', ${l.latitude}, ${l.longitude}, '${l.timestamp}')`).join(",");
const query = `INSERT INTO driver_locations (driver_id, latitude, longitude, timestamp) VALUES ${values}`;
try {
await pool.query(query);
console.log(`✅ Inserted ${buffer.length} records into DB`);
buffer = []; // Clear buffer after inserting
} catch (error) {
console.error("❌ Error inserting into DB:", error);
}
};
const runConsumer = async () => {
await consumer.connect();
await consumer.subscribe({ topic: "driver_location_updates", fromBeginning: false });
await consumer.run({
eachMessage: async ({ message }) => {
const location = JSON.parse(message.value.toString());
buffer.push(location);
if (buffer.length >= BATCH_SIZE) {
await insertBatchToDB();
}
},
});
setInterval(insertBatchToDB, 1000); // Inserts any remaining data every second
};
runConsumer();