@@ -203,3 +203,98 @@ async fn partitioned_table_copy_and_streams_new_data_from_new_partition() {
203203 . unwrap_or_default ( ) ;
204204 assert_eq ! ( parent_inserts. len( ) , 1 ) ;
205205}
206+
207+ /// Dropping a child partition must not emit DELETE/TRUNCATE events.
208+ #[ tokio:: test( flavor = "multi_thread" ) ]
209+ async fn partition_drop_does_not_emit_delete_or_truncate ( ) {
210+ init_test_tracing ( ) ;
211+ let database = spawn_source_database ( ) . await ;
212+
213+ let table_name = test_table_name ( "partitioned_events_drop" ) ;
214+ let partition_specs = [ ( "p1" , "from (1) to (100)" ) , ( "p2" , "from (100) to (200)" ) ] ;
215+
216+ let ( parent_table_id, _partition_table_ids) =
217+ create_partitioned_table ( & database, table_name. clone ( ) , & partition_specs)
218+ . await
219+ . expect ( "Failed to create partitioned table" ) ;
220+
221+ database
222+ . run_sql ( & format ! (
223+ "insert into {} (data, partition_key) values \
224+ ('event1', 50), ('event2', 150)",
225+ table_name. as_quoted_identifier( )
226+ ) )
227+ . await
228+ . unwrap ( ) ;
229+
230+ let publication_name = "test_partitioned_pub_drop" . to_string ( ) ;
231+ database
232+ . create_publication ( & publication_name, std:: slice:: from_ref ( & table_name) )
233+ . await
234+ . expect ( "Failed to create publication" ) ;
235+
236+ let state_store = NotifyingStore :: new ( ) ;
237+ let destination = TestDestinationWrapper :: wrap ( MemoryDestination :: new ( ) ) ;
238+
239+ let parent_sync_done = state_store
240+ . notify_on_table_state_type ( parent_table_id, TableReplicationPhaseType :: SyncDone )
241+ . await ;
242+
243+ let pipeline_id: PipelineId = random ( ) ;
244+ let mut pipeline = create_pipeline (
245+ & database. config ,
246+ pipeline_id,
247+ publication_name,
248+ state_store. clone ( ) ,
249+ destination. clone ( ) ,
250+ ) ;
251+
252+ pipeline. start ( ) . await . unwrap ( ) ;
253+ parent_sync_done. notified ( ) . await ;
254+
255+ let events_before = destination. get_events ( ) . await ;
256+ let grouped_before = group_events_by_type_and_table_id ( & events_before) ;
257+ let del_before = grouped_before
258+ . get ( & ( EventType :: Delete , parent_table_id) )
259+ . map ( |v| v. len ( ) )
260+ . unwrap_or ( 0 ) ;
261+ let trunc_before = grouped_before
262+ . get ( & ( EventType :: Truncate , parent_table_id) )
263+ . map ( |v| v. len ( ) )
264+ . unwrap_or ( 0 ) ;
265+
266+ // Detach and drop one child partition (DDL should not generate DML events)
267+ let child_p1_name = format ! ( "{}_{}" , table_name. name, "p1" ) ;
268+ let child_p1_qualified = format ! ( "{}.{}" , table_name. schema, child_p1_name) ;
269+ database
270+ . run_sql ( & format ! (
271+ "alter table {} detach partition {}" ,
272+ table_name. as_quoted_identifier( ) ,
273+ child_p1_qualified
274+ ) )
275+ . await
276+ . unwrap ( ) ;
277+ database
278+ . run_sql ( & format ! ( "drop table {}" , child_p1_qualified) )
279+ . await
280+ . unwrap ( ) ;
281+
282+ let _ = pipeline. shutdown_and_wait ( ) . await ;
283+
284+ let events_after = destination. get_events ( ) . await ;
285+ let grouped_after = group_events_by_type_and_table_id ( & events_after) ;
286+ let del_after = grouped_after
287+ . get ( & ( EventType :: Delete , parent_table_id) )
288+ . map ( |v| v. len ( ) )
289+ . unwrap_or ( 0 ) ;
290+ let trunc_after = grouped_after
291+ . get ( & ( EventType :: Truncate , parent_table_id) )
292+ . map ( |v| v. len ( ) )
293+ . unwrap_or ( 0 ) ;
294+
295+ assert_eq ! ( del_after, del_before, "Partition drop must not emit DELETE events" ) ;
296+ assert_eq ! (
297+ trunc_after, trunc_before,
298+ "Partition drop must not emit TRUNCATE events"
299+ ) ;
300+ }
0 commit comments