@@ -410,47 +410,53 @@ impl PgReplicationClient {
410410 & self ,
411411 publication_name : & str ,
412412 ) -> EtlResult < Vec < TableId > > {
413- // Prefer pg_publication_rel (explicit tables in the publication, including partition roots)
414- let rel_query = format ! (
415- r#"select r.prrelid as oid
416- from pg_publication_rel r
417- join pg_publication p on p.oid = r.prpubid
418- where p.pubname = {}"# ,
419- quote_literal( publication_name)
420- ) ;
421-
422- let mut table_ids = vec ! [ ] ;
423- let mut has_rows = false ;
424- for msg in self . client . simple_query ( & rel_query) . await ? {
425- if let SimpleQueryMessage :: Row ( row) = msg {
426- has_rows = true ;
427- let table_id =
428- Self :: get_row_value :: < TableId > ( & row, "oid" , "pg_publication_rel" ) . await ?;
429- table_ids. push ( table_id) ;
430- }
431- }
432-
433- if has_rows {
434- return Ok ( table_ids) ;
435- }
436-
437- // Fallback to pg_publication_tables (expanded view), used for publications like FOR ALL TABLES
438- let publication_query = format ! (
439- "select c.oid from pg_publication_tables pt
440- join pg_class c on c.relname = pt.tablename
441- join pg_namespace n on n.oid = c.relnamespace AND n.nspname = pt.schemaname
442- where pt.pubname = {};" ,
443- quote_literal( publication_name)
413+ let query = format ! (
414+ r#"
415+ with recursive has_rel as (
416+ select exists(
417+ select 1
418+ from pg_publication_rel r
419+ join pg_publication p on p.oid = r.prpubid
420+ where p.pubname = {pub}
421+ ) as has
422+ ),
423+ pub_tables as (
424+ select r.prrelid as oid
425+ from pg_publication_rel r
426+ join pg_publication p on p.oid = r.prpubid
427+ where p.pubname = {pub} and (select has from has_rel)
428+ union all
429+ select c.oid
430+ from pg_publication_tables pt
431+ join pg_class c on c.relname = pt.tablename
432+ join pg_namespace n on n.oid = c.relnamespace and n.nspname = pt.schemaname
433+ where pt.pubname = {pub} and not (select has from has_rel)
434+ ),
435+ recurse(relid) as (
436+ select oid from pub_tables
437+ union all
438+ select i.inhparent
439+ from pg_inherits i
440+ join recurse r on r.relid = i.inhrelid
441+ )
442+ select distinct relid as oid
443+ from recurse r
444+ where not exists (
445+ select 1 from pg_inherits i where i.inhrelid = r.relid
446+ );
447+ "# ,
448+ pub = quote_literal( publication_name)
444449 ) ;
445450
446- for msg in self . client . simple_query ( & publication_query) . await ? {
451+ let mut roots = vec ! [ ] ;
452+ for msg in self . client . simple_query ( & query) . await ? {
447453 if let SimpleQueryMessage :: Row ( row) = msg {
448454 let table_id = Self :: get_row_value :: < TableId > ( & row, "oid" , "pg_class" ) . await ?;
449- table_ids . push ( table_id) ;
455+ roots . push ( table_id) ;
450456 }
451457 }
452458
453- Ok ( table_ids )
459+ Ok ( roots )
454460 }
455461
456462 /// Starts a logical replication stream from the specified publication and slot.
0 commit comments