@@ -197,6 +197,7 @@ all_tests() ->
197
197
requeue_multiple_true ,
198
198
requeue_multiple_false ,
199
199
subscribe_from_each ,
200
+ dont_leak_file_handles ,
200
201
leader_health_check
201
202
].
202
203
@@ -1641,6 +1642,54 @@ subscribe_from_each(Config) ->
1641
1642
1642
1643
ok .
1643
1644
1645
+ dont_leak_file_handles (Config ) ->
1646
+
1647
+ [Server0 | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1648
+
1649
+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1650
+ # 'confirm.select_ok' {} = amqp_channel :call (Ch , # 'confirm.select' {}),
1651
+ QQ = ? config (queue_name , Config ),
1652
+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1653
+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1654
+ [begin
1655
+ publish_confirm (Ch , QQ )
1656
+ end || _ <- Servers ],
1657
+ timer :sleep (100 ),
1658
+ % % roll the wal to force consumer messages to be read from disk
1659
+ [begin
1660
+ ok = rpc :call (S , ra_log_wal , force_roll_over , [ra_log_wal ])
1661
+ end || S <- Servers ],
1662
+ timer :sleep (256 ),
1663
+
1664
+ C = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1665
+ [_ , NCh1 ] = rpc :call (Server0 , rabbit_channel , list , []),
1666
+ qos (C , 1 , false ),
1667
+ subscribe (C , QQ , false ),
1668
+ [begin
1669
+ receive
1670
+ {# 'basic.deliver' {delivery_tag = DeliveryTag }, _ } ->
1671
+ amqp_channel :call (C , # 'basic.ack' {delivery_tag = DeliveryTag })
1672
+ after 5000 ->
1673
+ flush (1 ),
1674
+ ct :fail (" basic.deliver timeout" )
1675
+ end
1676
+ end || _ <- Servers ],
1677
+ flush (1 ),
1678
+ [{_ , MonBy2 }] = rpc :call (Server0 , erlang , process_info , [NCh1 , [monitored_by ]]),
1679
+ NumMonRefsBefore = length ([M || M <- MonBy2 , is_reference (M )]),
1680
+ % % delete queue
1681
+ ? assertMatch (# 'queue.delete_ok' {},
1682
+ amqp_channel :call (Ch , # 'queue.delete' {queue = QQ })),
1683
+ [{_ , MonBy3 }] = rpc :call (Server0 , erlang , process_info , [NCh1 , [monitored_by ]]),
1684
+ NumMonRefsAfter = length ([M || M <- MonBy3 , is_reference (M )]),
1685
+ % % this isn't an ideal way to assert this but every file handle creates
1686
+ % % a monitor that (currenlty?) is a reference so we assert that we have
1687
+ % % fewer reference monitors after
1688
+ ? assert (NumMonRefsAfter < NumMonRefsBefore ),
1689
+
1690
+ rabbit_ct_client_helpers :close_channel (C ),
1691
+ ok .
1692
+
1644
1693
gh_12635 (Config ) ->
1645
1694
% https://github.com/rabbitmq/rabbitmq-server/issues/12635
1646
1695
[Server0 , _Server1 , Server2 ] =
@@ -4949,3 +4998,7 @@ ensure_qq_proc_dead(Config, Server, RaName) ->
4949
4998
ensure_qq_proc_dead (Config , Server , RaName )
4950
4999
end .
4951
5000
5001
+ lsof_rpc () ->
5002
+ Cmd = rabbit_misc :format (
5003
+ " lsof -p ~ts " , [os :getpid ()]),
5004
+ os :cmd (Cmd ).
0 commit comments