An enterprise-grade asynchronous media generation microservice built with FastAPI, Celery, and PostgreSQL. This service provides RESTful APIs for generating high-quality images using AI models (Replicate API) with robust async processing, retry mechanisms, and persistent storage.
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ FastAPI API │ │ Celery Worker │ │ PostgreSQL │
│ │ │ │ │ │
│ • Job Creation │ │ • Image Gen │ │ • Job Metadata │
│ • Status Check │ │ • File Storage │ │ • Status Track │
│ • File Serving │ │ • Error Handle │ │ • Retry Count │
│ • Job Listing │ │ • Metadata Save │ │ • Timestamps │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└───────────────────────┼───────────────────────┘
│
┌─────────────────┐ ┌─────────────────┐
│ Redis │ │ File Storage │
│ │ │ │
│ • Job Queue │ │ • Media Files │
│ • Result Cache │ │ • Metadata JSON │
│ • Session Store │ │ • S3 Ready │
└─────────────────┘ └─────────────────┘
- FastAPI Application: Async REST API with automatic OpenAPI documentation
- Celery Workers: Background job processing with retry logic and exponential backoff
- PostgreSQL: Persistent storage for job metadata and status tracking
- Redis: Message broker for Celery and result caching
- Replicate API: Real/Mock AI service for image generation (Stable Diffusion, FLUX)
- Dual Storage: Separate storage for media files and generation metadata
- ✅ FastAPI API Layer
POST /api/v1/generate
- Create generation jobsGET /api/v1/status/{job_id}
- Check job statusGET /api/v1/download/{job_id}
- Download generated media
- ✅ Async Job Queue with Celery and Redis
- ✅ PostgreSQL with async SQLAlchemy/SQLModel
- ✅ Retry Logic with exponential backoff
- ✅ Error Handling with persistent error tracking
- ✅ File Storage with local filesystem (S3-ready)
- ✅ Environment Configuration with .env support
- ✅ Typed Pydantic Models for request/response validation
- ✅ Docker Setup with docker-compose
- ✅ Alembic Migrations for schema management
- ✅ Async ORM with SQLModel
- ✅ Job Management: List, filter, and cancel jobs
- ✅ Metadata Storage: Separate JSON metadata files
- ✅ Health Checks and monitoring endpoints
- ✅ Structured Logging with configurable levels
- ✅ CORS Support for frontend integration
- ✅ Comprehensive Error Handling
POST /api/v1/generate
- Create new generation jobGET /api/v1/status/{job_id}
- Get job status and resultsGET /api/v1/jobs
- List all jobs with filtering and paginationGET /api/v1/jobs/{job_id}/metadata
- Get generation metadataDELETE /api/v1/jobs/{job_id}
- Cancel pending/processing job
GET /api/v1/download/{job_id}
- Download generated mediaGET /media/{filename}
- Direct file access (static files)
GET /
- Service informationGET /health
- Basic health check for load balancersGET /health/detailed
- Detailed health check with dependency statusGET /metrics
- Basic metrics for monitoring (job counts, etc.)GET /info
- Detailed service configurationGET /docs
- Interactive API documentation
- Docker and Docker Compose (recommended)
- Python 3.11+ (for local development)
- PostgreSQL 15+ (if not using Docker)
- Redis 7+ (if not using Docker)
git clone <your-repo-url>
cd fleek-media-service
# Copy environment configuration
cp config.env.example config.env
# Start all services (PostgreSQL, Redis, API, Workers, Flower)
docker-compose up -d
# Check service status
docker-compose ps
# View logs
docker-compose logs -f api
docker-compose logs -f worker
# Check health
curl http://localhost:8000/health
# View API documentation
open http://localhost:8000/docs
# Monitor Celery workers
open http://localhost:5555
# Create a generation job
curl -X POST "http://localhost:8000/api/v1/generate" \
-H "Content-Type: application/json" \
-d '{
"prompt": "A serene mountain landscape at sunset with crystal clear lake reflections",
"model_name": "black-forest-labs/flux-schnell",
"parameters": "{\"width\": 1024, \"height\": 1024, \"guidance_scale\": 3.5}"
}'
Response:
{
"id": "123e4567-e89b-12d3-a456-426614174000",
"prompt": "A serene mountain landscape at sunset with crystal clear lake reflections",
"model_name": "black-forest-labs/flux-schnell",
"status": "pending",
"created_at": "2024-01-01T00:00:00Z",
"retry_count": 0
}
# Check job status
curl "http://localhost:8000/api/v1/status/123e4567-e89b-12d3-a456-426614174000"
Response (Completed):
{
"id": "123e4567-e89b-12d3-a456-426614174000",
"status": "completed",
"result_url": "http://localhost:8000/media/123e4567-e89b-12d3-a456-426614174000.png",
"file_size": 1245760,
"completed_at": "2024-01-01T00:01:30Z",
"started_at": "2024-01-01T00:00:05Z"
}
# List all jobs (paginated)
curl "http://localhost:8000/api/v1/jobs?limit=20&offset=0"
# Filter by status
curl "http://localhost:8000/api/v1/jobs?status_filter=completed&limit=10"
# Get generation metadata
curl "http://localhost:8000/api/v1/jobs/123e4567-e89b-12d3-a456-426614174000/metadata"
# Download via API (recommended)
curl "http://localhost:8000/api/v1/download/123e4567-e89b-12d3-a456-426614174000" \
--output generated_image.png
# Direct file access
curl "http://localhost:8000/media/123e4567-e89b-12d3-a456-426614174000.png" \
--output image.png
The service uses a dual storage approach for better organization:
storage/
├── media/ # Generated images
│ └── {job_id}.png # Main image file
└── metadata/ # Generation parameters
└── {job_id}.json # Metadata file
Media File: storage/media/replicate-prediction-uvb7ynit4bhpjds3vn4bx7npeq.png
- High-quality PNG image (1024x1024 pixels)
- Generated using the specified prompt and parameters
Metadata File: storage/metadata/replicate-prediction-uvb7ynit4bhpjds3vn4bx7npeq.json
{
"width": 768,
"height": 768,
"prompt": "an astronaut riding a horse on mars, hd, dramatic lighting",
"scheduler": "K_EULER",
"num_outputs": 1,
"guidance_scale": 7.5,
"num_inference_steps": 50,
"model_name": "black-forest-labs/flux-schnell",
"external_job_id": "replicate-prediction-uvb7ynit4bhpjds3vn4bx7npeq",
"created_at": "2024-01-01T00:00:00Z"
}
This separation allows for:
- Easy analytics on generation parameters
- Debugging and optimization insights
- Separate cleanup policies for media vs metadata
- Better organization for large-scale deployments
# Quick API test
python scripts/test_api.py
# Comprehensive image generation test with quality verification
python scripts/test_image_generation.py
# Complete feature test (tests all new features including metadata, job listing)
python scripts/test_complete_features.py
# Test with custom API endpoint
python scripts/test_image_generation.py http://your-api-url:8000
python scripts/test_complete_features.py http://your-api-url:8000
# Initialize database (if running locally)
python scripts/init_db.py
# Test generation with high-quality settings
curl -X POST "http://localhost:8000/api/v1/generate" \
-H "Content-Type: application/json" \
-d '{
"prompt": "A photorealistic portrait of a majestic lion in golden hour lighting, ultra detailed, 8K quality",
"model_name": "black-forest-labs/flux-schnell",
"parameters": "{\"width\": 1024, \"height\": 1024, \"num_inference_steps\": 4, \"guidance_scale\": 3.5}"
}'
# 1. Install dependencies
pip install -r requirements.txt
# 2. Start external services
docker-compose up postgres redis -d
# 3. Set up environment
cp config.env.example .env
# Edit .env with local database URLs
# 4. Initialize database
python scripts/init_db.py
# 5. Start API server
uvicorn app.main:app --reload
# 6. Start Celery worker (in another terminal)
celery -A app.tasks.celery_app worker --loglevel=info
# 7. Optional: Start Flower monitoring
celery -A app.tasks.celery_app flower
# Create new migration
alembic revision --autogenerate -m "Add new feature"
# Apply migrations
alembic upgrade head
# Check current migration status
alembic current
# View migration history
alembic history
# Rebuild and restart all services
./scripts/docker-restart.sh
# View service logs
docker-compose logs -f api worker
# Access database
docker-compose exec postgres psql -U fleek -d fleek_media
# Access Redis CLI
docker-compose exec redis redis-cli
Key configuration options in config.env
:
# Database Configuration
DATABASE_URL=postgresql+asyncpg://fleek:fleek123@localhost:5432/fleek_media
DATABASE_URL_SYNC=postgresql://fleek:fleek123@localhost:5432/fleek_media
# Redis & Celery
REDIS_URL=redis://localhost:6379/0
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0
# API Configuration
API_HOST=0.0.0.0
API_PORT=8000
API_DEBUG=true
# File Storage
STORAGE_PATH=./storage/media
STORAGE_BASE_URL=http://localhost:8000/media
# Replicate API Configuration
# For REAL AI generation (replace with your token):
# REPLICATE_API_TOKEN=r8_your_actual_token_here
REPLICATE_API_TOKEN=mock_token_for_demo
# Mock settings (only used when token is mock)
REPLICATE_MOCK_DELAY_MIN=2
REPLICATE_MOCK_DELAY_MAX=10
REPLICATE_MOCK_FAILURE_RATE=0.1
# Job Configuration
MAX_RETRY_ATTEMPTS=3
RETRY_BACKOFF_FACTOR=2
# Logging
LOG_LEVEL=INFO
Mock Mode (Default):
- Uses
REPLICATE_API_TOKEN=mock_token_for_demo
- Generates simple colored images for testing
- Configurable delays and failure rates
- Perfect for development and testing
Real Mode (Production):
- Sign up at Replicate.com
- Get your API token from Account Settings
- Update
config.env
:REPLICATE_API_TOKEN=r8_your_actual_token_here
- Restart the services
# Basic health check (for load balancers)
curl http://localhost:8000/health
# Response: {"status": "healthy", "service": "fleek-media-service", "version": "1.0.0"}
# Detailed health check with dependency status
curl http://localhost:8000/health/detailed
# Response includes database, Redis, and storage connectivity
# Basic metrics
curl http://localhost:8000/metrics
# Response includes job counts by status and system info
# Detailed service info
curl http://localhost:8000/info
Access Flower web interface at http://localhost:5555
to monitor:
- Active workers and their status
- Task queues and processing rates
- Failed tasks and retry attempts
- Real-time task execution
Structured logging with configurable levels:
- INFO: General operation information
- DEBUG: Detailed execution traces
- ERROR: Error conditions and exceptions
- WARNING: Important notices
Configure via LOG_LEVEL
environment variable.
Recommended Production Setup:
Load Balancer: nginx/HAProxy
API Instances: 2-3 containers
Worker Instances: 3-5 containers
Database: PostgreSQL 15+ with replica
Cache: Redis cluster
Storage: S3-compatible object storage
Monitoring: Prometheus + Grafana
# config.env for production
DATABASE_URL=postgresql+asyncpg://user:pass@db-cluster:5432/fleek_media
REDIS_URL=redis://redis-cluster:6379/0
STORAGE_PATH=/app/storage
REPLICATE_API_TOKEN=r8_your_production_token
API_DEBUG=false
LOG_LEVEL=INFO
- Replace default database credentials
- Use environment-specific API tokens
- Enable HTTPS termination at load balancer
- Configure proper CORS origins
- Set up authentication/authorization (not included)
- Enable database connection encryption
- Use secrets management (Vault, K8s secrets)
- Configure rate limiting
- Set up API versioning strategy
Horizontal Scaling:
- Scale API instances behind load balancer
- Scale worker instances based on queue depth
- Use Redis cluster for high availability
- Implement database read replicas
Storage Migration:
- Switch from local storage to S3-compatible
- Update
storage_service
implementation - Configure CDN for media delivery
- Implement lifecycle policies for cleanup
- Follow the local development setup
- Create feature branches from
main
- Write tests for new functionality
- Run the test suite:
python scripts/test_api.py
- Use provided scripts for common tasks
- Check code formatting with
black
andisort
# Format code
black app/ scripts/
isort app/ scripts/
# Lint code
flake8 app/
mypy app/
# Run tests
pytest tests/ -v
Visit http://localhost:8000/docs
for the complete interactive API documentation with:
- All endpoint details and parameters
- Request/response schemas
- Example requests and responses
- Authentication requirements (when implemented)
- Error code descriptions
The OpenAPI 3.0 specification is available at http://localhost:8000/openapi.json
- API Layer: FastAPI with POST /generate and GET /status endpoints
- Async Job Queue: Celery with Redis for background processing
- Persistent Storage: PostgreSQL for job metadata, file system for media
- Retry Logic: Exponential backoff with configurable attempts
- Error Handling: Graceful error management with persistent error tracking
- Configuration: Environment-based configuration with .env support
- Typed Pydantic Models: Full request/response validation
- Docker Setup: Complete containerized environment
- Alembic Migrations: Database schema management
- Async ORM: SQLModel with async SQLAlchemy
- Clean Architecture: Separation of concerns across layers
- Comprehensive Documentation: Setup, usage, and deployment guides
- Testing Suite: Automated API and image generation tests
- Monitoring Ready: Health checks and observability features
- Enterprise Features: Job management, metadata storage, error tracking
Ready for Production Deployment! 🚀
This implementation demonstrates enterprise-grade software engineering practices while maintaining clarity and ease of use. It's ready for immediate deployment with proper environment configuration.