A comprehensive Model Context Protocol (MCP) server for Apache Kafka operations, enabling seamless integration with Claude Desktop and other MCP clients.
- Cluster Management: Monitor cluster health, broker information, and metadata
- Topic Operations: Create, list, describe, and delete Kafka topics
- Message Operations: Send and consume messages with flexible configuration
- Consumer Group Management: List and describe consumer groups
- Real-time Monitoring: Get topic metrics, offsets, and performance data
- Health Checks: Comprehensive cluster health monitoring
- Secure Configuration: Support for SASL, SSL, and various authentication methods
- Python 3.8+
- Apache Kafka cluster (local or remote)
- pip package manager
-
Clone the repository:
git clone https://github.com/aswinayyolath/kafka-mcp-server.git cd kafka-mcp-server
-
Create a virtual environment:
python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate
-
Install dependencies:
pip install mcp kafka-python
-
Configure environment variables:
cp .env.template .env # Edit .env with your Kafka configuration
Create a .env
file based on .env.template
:
# Basic Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_SECURITY_PROTOCOL=PLAINTEXT
# SASL Configuration (if needed)
KAFKA_SASL_MECHANISM=PLAIN
KAFKA_SASL_USERNAME=your_username
KAFKA_SASL_PASSWORD=your_password
# SSL Configuration (if needed)
KAFKA_SSL_CAFILE=/path/to/ca.pem
KAFKA_SSL_CERTFILE=/path/to/cert.pem
KAFKA_SSL_KEYFILE=/path/to/key.pem
Start a local Kafka cluster using Docker:
docker-compose up -d
This will start Kafka on localhost:9092
.
Run the comprehensive test suite to validate your setup:
# Set environment variables
export KAFKA_BOOTSTRAP_SERVERS=localhost:9092
export KAFKA_SECURITY_PROTOCOL=PLAINTEXT
# Run tests
python test_kafka_mcp.py
The test suite validates:
- Kafka connectivity
- MCP server startup
- All available tools
- Topic operations
- Message operations
- Resources and prompts
Run the MCP server directly:
python kafka_mcp_server.py
Add to your Claude Desktop configuration (claude_desktop_config.json
):
{
"mcpServers": {
"kafka": {
"command": "python",
"args": ["/path/to/kafka_mcp_server.py"],
"env": {
"KAFKA_BOOTSTRAP_SERVERS": "localhost:9092",
"KAFKA_SECURITY_PROTOCOL": "PLAINTEXT"
}
}
}
}
get_cluster_info
- Get comprehensive cluster informationhealth_check
- Perform cluster health check
list_topics
- List all topics in the clustercreate_topic
- Create a new topicdescribe_topic
- Get detailed topic informationdelete_topic
- Delete a topicget_topic_metrics
- Get comprehensive topic metricsget_topic_offsets
- Get partition offsets
send_message
- Send a single messagesend_batch_messages
- Send multiple messagesconsume_messages
- Consume messages from a topic
list_consumer_groups
- List all consumer groupsdescribe_consumer_group
- Get detailed consumer group information
The server provides MCP resources for easy access to cluster information:
kafka://cluster/info
- Real-time cluster informationkafka://topics/list
- Current topics list
Built-in prompts for common scenarios:
kafka_monitoring_prompt
- Generate monitoring and troubleshooting guidancekafka_troubleshooting_prompt
- Get help with specific issues
Once integrated with Claude Desktop, you can ask:
- "Can you check the health of my Kafka cluster?"
- "List all topics and their partition counts"
- "Create a topic called 'user-events' with 3 partitions"
- "Send a test message to the user-events topic"
- "Show me the last 10 messages from user-events"
- "What consumer groups are active?"
The server supports various Kafka authentication methods:
- PLAINTEXT: No authentication (development only)
- SASL_PLAINTEXT: SASL authentication over plain connection
- SASL_SSL: SASL authentication over SSL
- SSL: SSL client certificate authentication
- Never commit
.env
files - Use.env.template
for examples - Use SSL in production - Always encrypt connections to production clusters
- Limit permissions - Use dedicated service accounts with minimal required permissions
- Monitor access - Log and monitor MCP server usage
-
Connection refused:
- Verify Kafka is running
- Check
KAFKA_BOOTSTRAP_SERVERS
configuration - Ensure network connectivity
-
Authentication failures:
- Verify SASL credentials
- Check SSL certificate paths
- Validate security protocol settings
-
Topic not found:
- Ensure topic exists
- Check topic name spelling
- Verify permissions
Enable debug logging by setting:
export PYTHONPATH=.
python -c "import logging; logging.basicConfig(level=logging.DEBUG)"
python kafka_mcp_server.py
- Fork the repository
- Create a feature branch:
git checkout -b feature-name
- Make your changes
- Run tests:
python test_kafka_mcp.py
- Commit changes:
git commit -am 'Add feature'
- Push to branch:
git push origin feature-name
- Submit a pull request
This project is licensed under the Apache License 2.0 - see the LICENSE file for details.
- Model Context Protocol - For the MCP specification
- Apache Kafka - For the distributed streaming platform
- kafka-python - For the Python Kafka client
For issues and questions:
- Check the troubleshooting section
- Run the test suite to validate your setup
- Review Kafka and MCP documentation
- Open an issue with detailed error information
Happy Kafka streaming with MCP! π