1+ .. _kafka-tutorial-replicate-with-cdc:
2+
13=================================================
24Replicate Data with a Change Data Capture Handler
35=================================================
@@ -13,33 +15,358 @@ Replicate Data with a Change Data Capture Handler
1315Overview
1416--------
1517
16- Learn how to use a **change data capture (CDC) handler** to replicate data with the
17- {+mkc+}. A CDC handler is a program that translates CDC events into MongoDB
18- write operations. Use a CDC handler when you need to reproduce the changes in
19- one datastore into another datastore.
18+ Follow this tutorial to learn how to use a
19+ **change data capture (CDC) handler** to replicate data with the {+mkc+}.
20+ A CDC handler is an application that translates CDC events into MongoDB
21+ write operations. Use a CDC handler when you need to reproduce the changes
22+ in one datastore into another datastore.
2023
21- In this tutorial, you use a CDC handler to make two MongoDB collections
22- contain the same documents.
24+ In this tutorial, you configure and run MongoDB Kafka source and sink
25+ connectors to make two MongoDB collections contain the same documents using
26+ CDC. The source connector writes change stream data from the original
27+ collection to a Kafka topic and the sink connector writes the Kafka topic
28+ data to the target MongoDB collection.
2329
24- If you want to learn more about how CDC handlers work rather than view
25- a tutorial that demonstrates how you use them, see the
30+ If you want to learn more about how CDC handlers work, see the
2631:ref:`<sink-fundamentals-cdc-handler>` guide.
2732
28- Requirements
29- ~~~~~~~~~~~~
33+ .. include:: /includes/tutorials/setup.rst
3034
31- .. include:: /includes/tutorials/pipeline-requirements.rst
35+ Replicate Data with a CDC Handler
36+ ---------------------------------
3237
33- Tutorial
34- --------
38+ .. procedure::
39+ :style: connected
40+
41+ .. step:: Start Interactive Shells
42+
43+ Start two interactive shells on the Docker container in separate
44+ windows. In the tutorial, you can use the shells to run and observe
45+ different tasks.
46+
47+ Run the following command in the terminal to start an interactive shell
48+ called **Shell1** in one terminal window:
49+
50+ .. code-block:: bash
51+ :caption: This command starts an interactive shell called Shell1
52+
53+ docker run --rm --name Shell1 --network mongodb-kafka-base_localnet -it robwma/mongokafkatutorial:latest bash
54+
55+ Run the following command in the terminal to start an interactive shell
56+ called **Shell2** in the other terminal window:
57+
58+ .. code-block:: bash
59+ :caption: This command starts an interactive shell called Shell2
60+
61+ docker run --rm --name Shell2 --network mongodb-kafka-base_localnet -it robwma/mongokafkatutorial:latest bash
62+
63+ Arrange the two windows on your screen to keep both of them visible to
64+ see real-time updates.
65+
66+ Use **Shell1** to configure your connectors and monitor your Kafka
67+ topic. Use **Shell2** to perform write operations in MongoDB.
68+
69+ .. step:: Configure the Source Connector
70+
71+ In **Shell1**, configure a source connector to read from the
72+ ``CDCTutorial.Source`` MongoDB namespace and write to the
73+ ``CDCTutorial.Source`` Kafka topic.
74+
75+ Create a configuration file called ``cdc-source.json`` using the
76+ following command:
77+
78+ .. code-block:: bash
79+
80+ nano cdc-source.json
81+
82+ Paste the following configuration information into the file and save
83+ your changes:
84+
85+ .. code-block:: json
86+
87+ {
88+ "name": "mongo-cdc-source",
89+ "config": {
90+ "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
91+ "connection.uri": "mongodb://mongo1",
92+ "database": "CDCTutorial",
93+ "collection": "Source"
94+ }
95+ }
96+
97+ Run the following command in **Shell1** to start the source connector
98+ using the configuration file you created:
99+
100+ .. code-block:: bash
101+
102+ cx cdc-source.json
103+
104+ .. note::
105+
106+ The ``cx`` command is a custom script included in the tutorial
107+ development environment. This script runs the following
108+ equivalent request to the Kafka Connect REST API to create a new
109+ connector:
110+
111+ .. code-block:: bash
112+
113+ curl -X POST -H "Content-Type: application/json" -d @cdc-source.json http://connect:8083/connectors -w "\n"
114+
115+ Run the following command in the shell to check the status of the
116+ connectors:
117+
118+ .. code-block:: bash
119+
120+ status
121+
122+ If your source connector started successfully, you should see the
123+ following output:
124+
125+ .. code-block:: none
126+ :copyable: false
127+
128+ Kafka topics:
129+ ...
130+ The status of the connectors:
131+
132+ source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
133+
134+ Currently configured connectors
135+
136+ [
137+ "mongo-cdc-source"
138+ ]
139+ ...
140+
141+ .. step:: Configure the Sink Connector
142+
143+ In **Shell1**, configure a sink connector to copy data from the
144+ ``CDCTutorial.Source`` Kafka topic to ``CDCTutorial.Destination``
145+ MongoDB namespace.
146+
147+ Create a configuration file called ``cdc-sink.json`` using the
148+ following command:
149+
150+ .. code-block:: bash
151+
152+ nano cdc-sink.json
153+
154+ Paste the following configuration information into the file and save
155+ your changes:
156+
157+ .. code-block:: json
158+
159+ {
160+ "name": "mongo-cdc-sink",
161+ "config": {
162+ "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
163+ "topics": "CDCTutorial.Source",
164+ "change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.mongodb.ChangeStreamHandler",
165+ "connection.uri": "mongodb://mongo1",
166+ "database": "CDCTutorial",
167+ "collection": "Destination"
168+ }
169+ }
170+
171+ Run the following command in the shell to start the sink connector
172+ using the configuration file you created:
173+
174+ .. code-block:: bash
175+
176+ cx cdc-sink.json
177+
178+ Run the following command in the shell to check the status of the
179+ connectors:
180+
181+ .. code-block:: bash
182+
183+ status
184+
185+ If your sink connector started successfully, you should see the
186+ following output:
187+
188+ .. code-block:: none
189+ :copyable: false
190+
191+ Kafka topics:
192+ ...
193+ The status of the connectors:
194+
195+ sink | mongo-cdc-sink | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSinkConnector
196+ source | mongo-cdc-source | RUNNING | RUNNING | com.mongodb.kafka.connect.MongoSourceConnector
197+
198+ Currently configured connectors
199+
200+ [
201+ "mongo-cdc-sink"
202+ "mongo-cdc-source"
203+ ]
204+ ...
205+
206+ .. step:: Monitor the Kafka Topic
207+
208+ In **Shell1**, monitor the Kafka topic for incoming events. Run the
209+ following command to start the ``kafkacat`` application which outputs
210+ data published to the topic:
211+
212+ .. code-block:: bash
213+
214+ kc CDCTutorial.Source
215+
216+ .. note::
217+
218+ The ``kc`` command is a custom script included in the tutorial
219+ development environment that calls the ``kafkacat`` application
220+ with options to connect to Kafka and format the output of the
221+ specified topic.
222+
223+ Once started, you should see the following output that indicates there
224+ is currently no data to read:
225+
226+ .. code-block:: none
227+ :copyable: false
228+
229+ % Reached end of topic CDCTutorial.Source [0] at offset 0
230+
231+ .. step:: Write Data into the Source and Watch the Data Flow
232+
233+ In **Shell2**, connect to MongoDB using ``mongosh``, the MongoDB
234+ shell by running the following command:
235+
236+ .. code-block:: bash
237+
238+ mongosh "mongodb://mongo1"
239+
240+ After you connect successfully, you should see the following
241+ MongoDB shell prompt:
242+
243+ .. code-block::
244+ :copyable: false
245+
246+ rs0 [direct: primary] test>
247+
248+ At the prompt, type the following commands to insert a new document
249+ into the ``CDCTutorial.Source`` MongoDB namespace:
250+
251+ .. code-block:: json
252+
253+ use CDCTutorial
254+ db.Source.insert({ proclaim: "Hello World!" });
255+
256+ Once MongoDB completes the insert command, you should receive an
257+ acknowledgment that resembles the following text:
258+
259+ .. code-block:: json
260+ :copyable: false
261+
262+ {
263+ acknowledged: true,
264+ insertedId: ObjectId("600b38ad...")
265+ }
266+
267+ The source connector picks up the change and publishes it to the
268+ Kafka topic. You should see the following topic message in your
269+ **Shell1** window:
270+
271+ .. code-block:: json
272+ :copyable: false
273+
274+ {
275+ "schema": { "type": "string", "optional": false },
276+ "payload": {
277+ "_id": { "_data": "8260..." },
278+ "operationType": "insert",
279+ "clusterTime": { "$timestamp": { "t": 1611..., "i": 2 } },
280+ "fullDocument": {
281+ "_id": { "$oid": "600b38ad..." },
282+ "proclaim": "Hello World!"
283+ },
284+ "ns": { "db": "CDCTutorial", "coll": "Source" },
285+ "documentKey": { "_id": { "$oid": "600b38a..." } }
286+ }
287+ }
288+
289+ The sink connector picks up the Kafka message and sinks the data
290+ into MongoDB. You can retrieve the document from the
291+ ``CDCTutorial.Destination`` namespace in MongoDB by running the
292+ following command in the MongoDB shell you started in **Shell2**:
293+
294+ .. code-block:: json
295+
296+ db.Destination.find()
297+
298+ You should see the following document returned as the result:
299+
300+ .. code-block:: json
301+ :copyable: false
302+
303+ [
304+ {
305+ _id: ObjectId("600b38a..."),
306+ proclaim: 'Hello World'
307+ }
308+ ]
309+
310+ .. step:: (Optional) Generate Additional Changes
311+
312+ Try removing documents from the ``CDCTutorial.Source`` namespace
313+ by running the following command from the MongoDB shell:
314+
315+ .. code-block:: json
316+
317+ db.Source.deleteMany({})
318+
319+ You should see the following topic message in your **Shell1**
320+ window:
321+
322+ .. code-block:: json
323+ :copyable: false
324+
325+ {
326+ "schema": { "type": "string", "optional": false },
327+ "payload": {
328+ "_id": { "_data": "8261...." },
329+ ...
330+ "operationType": "delete",
331+ "clusterTime": { "$timestamp": { "t": 1631108282, "i": 1 } },
332+ "ns": { "db": "CDCTutorial", "coll": "Source" },
333+ "documentKey": { "_id": { "$oid": "6138..." } }
334+ }
335+ }
336+
337+ Run the following command to retrieve the current number of documents
338+ in the collection:
339+
340+ .. code-block:: json
341+
342+ db.Destination.count()
343+
344+ This returns the following output, indicating the collection is empty:
345+
346+ .. code-block:: none
347+ :copyable: false
348+
349+ 0
350+
351+ Run the following command to exit the MongoDB shell:
352+
353+ .. code-block:: none
354+
355+ exit
356+
357+ Summary
358+ -------
35359
36- .. include:: /includes/steps/cdc-tutorial.rst
360+ In this tutorial, you set up a source connector to capture changes to a
361+ MongoDB collection and send them to Apache Kafka. You also configured a
362+ sink connector with a MongoDB CDC Handler to move the data from Apache
363+ Kafka to a MongoDB collection.
37364
38- Further Reading
39- ---------------
365+ Learn More
366+ ----------
40367
41- To learn more about the topics discussed in this tutorial, see the
42- following {+mkc+} guides :
368+ Read the following resources to learn more about concepts mentioned in
369+ this tutorial :
43370
44371- :ref:`<sink-fundamentals-cdc-handler>`
45372- :ref:`<kafka-source-change-streams>`
0 commit comments