End-to-end demo: Change Data Capture (CDC) from Postgres, streaming to Kafka, and archiving to MinIO using Kafka Connectors.
flowchart LR
subgraph "Source"
P["Postgres<br/>DB"]
end
subgraph "Streaming"
K["Kafka<br/>(Broker)"]
KC["Kafka Connect<br/>Debezium Connector"]
end
subgraph "Sink"
M["MinIO<br/>Object Storage"]
end
P -- "CDC (Debezium)" --> KC
KC -- "Publishes events" --> K
K -- "Sink Connector" --> M
Flow:
- Postgres emits change events (CDC) via Debezium.
- Kafka Connect reads changes and publishes to Kafka topics.
- Kafka Sink Connector writes data from Kafka to MinIO.
- k8s cluster (any of kind, minikube, k3d)
- On my local k8s cluster, I've binded k8s API to ip:
192.168.0.100
- On my local k8s cluster, I've binded k8s API to ip:
- Ingress controller
- ingress will serve over API ip. Customize ingress in helm values or disable if you don't need it'
See my MiniO repository for extra information if you need
helm repo add minio-operator https://operator.min.io
# Install operator
helm upgrade -i operator minio-operator/operator \
--namespace minio-operator \
--create-namespace \
--version 7.1.1
# Install Tenant (MiniO storage)
helm upgrade -i tenant minio-operator/tenant \
--namespace minio-operator \
--create-namespace \
--version 7.1.1 \
-f helm-values/minio/minio-tenant.yaml- UI: https://minio-192.168.0.100.nip.io
- Credentials: user:
admin, password:admin123 - Create bucket from UI or with helm of CLI:
mc mb minio/postgres-sink
See my postgres repository for extra installation options like using operators.
helm repo add bitnami https://charts.bitnami.com/bitnami
helm upgrade -i bitnami-postgresql bitnami/postgresql \
--namespace bitnami-postgres --create-namespace \
--version 16.7.21 \
-f helm-values/postgresql/bitnami-postgresql.yamlSee my kafka repository for extra installation options.
# Install kafka
helm upgrade -i kafka bitnami/kafka \
-n kafka --create-namespace \
--version 31.3.1 \
-f helm-values/kafka/broker.yaml \
-f helm-values/kafka/plaintext.yaml \
-f helm-values/kafka/kraft.yaml
# Install UI
helm repo add kafbat-ui https://kafbat.github.io/helm-charts
helm upgrade -i kafka-ui kafbat-ui/kafka-ui \
-n kafka-ui --create-namespace \
--version 1.5.1 \
-f helm-values/kafka/kafbat-ui.yamlkubectl apply -f connectors/postgres.yaml
kubectl apply -f connectors/s3.yaml-- Verify Configuration
SELECT name, setting FROM pg_settings
WHERE name IN ('wal_level', 'rds.logical_replication', 'max_replication_slots');
-- SHOW wal_level;
-- SHOW max_replication_slots;
-- SELECT * FROM pg_replication_slots;
-- Create Dedicated CDC User
CREATE USER cdc_user WITH REPLICATION PASSWORD 'secure_password';
-- Grant necessary permissions
GRANT USAGE ON SCHEMA public TO cdc_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO cdc_user;
-- For RDS, grant additional role
GRANT rds_replication TO cdc_user;
-- Set Up Tables
CREATE TABLE student (id SERIAL PRIMARY KEY, name VARCHAR(100));
-- Configure Tables for CDC to identify row where Primary Key is not available in table, use only when:
-- Your table has no primary key, or
-- You want Debezium to capture before-image data of UPDATEs (i.e., old values), or
-- You want to support UPDATEs/DELETEs on columns not part of a key and still emit useful Kafka records
ALTER TABLE public.student REPLICA IDENTITY FULL;
-- Create replication slot (KafkaConnector can create it automatically)
-- SELECT pg_create_logical_replication_slot('debezium_slot', 'pgoutput');
-- Create publication for specific table (KafkaConnector can create it automatically)
-- CREATE PUBLICATION debezium_publication FOR TABLE public.student;
-- Test CDC Functionality
-- Insert sample data
INSERT INTO student (name) VALUES ('John Doe'), ('Jane Smith');
INSERT INTO student (name) VALUES ('New Student');
UPDATE student SET name = 'Updated Name' WHERE id = 1;
DELETE FROM student WHERE id = 2;
-- Insert many random data into student table if don't want to add one by one
INSERT INTO student (name)
SELECT SUBSTR('ABCDEFGHIJKLMNOPQRSTUVWXYZ', ((random() * (26 - 20 + 1) + 1)::integer), 20)
FROM generate_series(1, 10);
SELECT * FROM student;
-- Monitor and Troubleshoot
-- Check Replication Slots
SELECT slot_name, plugin, slot_type, database, active
FROM pg_replication_slots;
-- Verify replication slot
SELECT * FROM pg_replication_slots;
-- Check Publications
SELECT * FROM pg_publication;
SELECT oid,pubname FROM pg_publication;
SELECT * FROM pg_publication_tables;
SELECT oid,pubname FROM pg_publication;
SELECT * FROM pg_publication_tables WHERE pubname = 'debezium_publication';
-- Cleanup Commands (if needed)
-- If you want to delete a replication slot
SELECT pg_drop_replication_slot('debezium_slot');
-- To delete a publication
DROP PUBLICATION IF EXISTS debezium_publication;