@@ -661,7 +661,8 @@ impl PgReplicationClient {
661661 when 0 then true
662662 else (a.attnum in (select * from pub_attrs))
663663 end
664- )" ,
664+ )"
665+ . to_string ( ) ,
665666 )
666667 } else {
667668 // Postgres 14 or earlier or unknown, fallback to no column-level filtering
@@ -676,20 +677,90 @@ impl PgReplicationClient {
676677 )" ,
677678 publication = quote_literal( publication) ,
678679 ) ,
679- "and (select count(*) from pub_table) > 0" ,
680+ format ! (
681+ "and ((select count(*) from pub_table) > 0 or exists(
682+ -- Also allow if parent table is in publication (for partitioned tables)
683+ select 1 from pg_inherits i
684+ join pg_publication_rel r on r.prrelid = i.inhparent
685+ join pg_publication p on p.oid = r.prpubid
686+ where i.inhrelid = {table_id} and p.pubname = {publication}
687+ ))" ,
688+ publication = quote_literal( publication) ,
689+ ) ,
680690 )
681691 }
682692 } else {
683- ( "" . into ( ) , "" )
693+ ( "" . to_string ( ) , "" . to_string ( ) )
694+ } ;
695+
696+ let has_pub_cte = !pub_cte. is_empty ( ) ;
697+
698+ let cte_prefix = if has_pub_cte {
699+ // If there's already a pub_cte WITH clause, add our CTEs to it with a comma
700+ format ! ( "{pub_cte}," )
701+ } else {
702+ // If no pub_cte, start our own WITH clause (no need for RECURSIVE)
703+ "with " . to_string ( )
684704 } ;
685705
686706 let column_info_query = format ! (
687- "{pub_cte}
688- select a.attname,
707+ "{cte_prefix}
708+ -- Find direct parent of current table (if it's a partition)
709+ direct_parent as (
710+ select i.inhparent as parent_oid
711+ from pg_inherits i
712+ where i.inhrelid = {table_id}::oid
713+ limit 1
714+ ),
715+ -- Get parent table's primary key columns
716+ parent_pk_cols as (
717+ select array_agg(a.attname order by x.n) as pk_column_names
718+ from pg_constraint con
719+ join unnest(con.conkey) with ordinality as x(attnum, n) on true
720+ join pg_attribute a on a.attrelid = con.conrelid and a.attnum = x.attnum
721+ join direct_parent dp on con.conrelid = dp.parent_oid
722+ where con.contype = 'p'
723+ group by con.conname
724+ ),
725+ -- Check if current table has a unique index on the parent PK columns
726+ partition_has_pk_index as (
727+ select case
728+ when exists (select 1 from direct_parent)
729+ and exists (select 1 from parent_pk_cols)
730+ and exists (
731+ -- Check if there's a unique, valid index on the parent PK columns
732+ select 1
733+ from pg_index ix
734+ cross join parent_pk_cols pk
735+ where ix.indrelid = {table_id}::oid
736+ and ix.indisunique = true
737+ and ix.indisvalid = true
738+ and array(
739+ select a.attname
740+ from unnest(ix.indkey) with ordinality k(attnum, ord)
741+ join pg_attribute a on a.attrelid = ix.indrelid and a.attnum = k.attnum
742+ where ord <= ix.indnkeyatts -- exclude INCLUDE columns
743+ order by ord
744+ ) = pk.pk_column_names
745+ ) then true
746+ else false
747+ end as has_inherited_pk
748+ )
749+ SELECT a.attname,
689750 a.atttypid,
690751 a.atttypmod,
691752 a.attnotnull,
692- coalesce(i.indisprimary, false) as primary
753+ case
754+ -- First check for direct primary key
755+ when coalesce(i.indisprimary, false) = true then true
756+ -- Then check for inherited primary key from partitioned table parent
757+ when (select has_inherited_pk from partition_has_pk_index) = true
758+ and exists (
759+ select 1 from parent_pk_cols pk
760+ where a.attname = any(pk.pk_column_names)
761+ ) then true
762+ else false
763+ end as primary
693764 from pg_attribute a
694765 left join pg_index i
695766 on a.attrelid = i.indrelid
0 commit comments