@@ -197,6 +197,7 @@ all_tests() ->
197
197
requeue_multiple_true ,
198
198
requeue_multiple_false ,
199
199
subscribe_from_each ,
200
+ dont_leak_file_handles ,
200
201
leader_health_check
201
202
].
202
203
@@ -1629,6 +1630,54 @@ subscribe_from_each(Config) ->
1629
1630
1630
1631
ok .
1631
1632
1633
+ dont_leak_file_handles (Config ) ->
1634
+
1635
+ [Server0 | _ ] = Servers = rabbit_ct_broker_helpers :get_node_configs (Config , nodename ),
1636
+
1637
+ Ch = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1638
+ # 'confirm.select_ok' {} = amqp_channel :call (Ch , # 'confirm.select' {}),
1639
+ QQ = ? config (queue_name , Config ),
1640
+ ? assertEqual ({'queue.declare_ok' , QQ , 0 , 0 },
1641
+ declare (Ch , QQ , [{<<" x-queue-type" >>, longstr , <<" quorum" >>}])),
1642
+ [begin
1643
+ publish_confirm (Ch , QQ )
1644
+ end || _ <- Servers ],
1645
+ timer :sleep (100 ),
1646
+ % % roll the wal to force consumer messages to be read from disk
1647
+ [begin
1648
+ ok = rpc :call (S , ra_log_wal , force_roll_over , [ra_log_wal ])
1649
+ end || S <- Servers ],
1650
+ timer :sleep (256 ),
1651
+
1652
+ C = rabbit_ct_client_helpers :open_channel (Config , Server0 ),
1653
+ [_ , NCh1 ] = rpc :call (Server0 , rabbit_channel , list , []),
1654
+ qos (C , 1 , false ),
1655
+ subscribe (C , QQ , false ),
1656
+ [begin
1657
+ receive
1658
+ {# 'basic.deliver' {delivery_tag = DeliveryTag }, _ } ->
1659
+ amqp_channel :call (C , # 'basic.ack' {delivery_tag = DeliveryTag })
1660
+ after 5000 ->
1661
+ flush (1 ),
1662
+ ct :fail (" basic.deliver timeout" )
1663
+ end
1664
+ end || _ <- Servers ],
1665
+ flush (1 ),
1666
+ [{_ , MonBy2 }] = rpc :call (Server0 , erlang , process_info , [NCh1 , [monitored_by ]]),
1667
+ NumMonRefsBefore = length ([M || M <- MonBy2 , is_reference (M )]),
1668
+ % % delete queue
1669
+ ? assertMatch (# 'queue.delete_ok' {},
1670
+ amqp_channel :call (Ch , # 'queue.delete' {queue = QQ })),
1671
+ [{_ , MonBy3 }] = rpc :call (Server0 , erlang , process_info , [NCh1 , [monitored_by ]]),
1672
+ NumMonRefsAfter = length ([M || M <- MonBy3 , is_reference (M )]),
1673
+ % % this isn't an ideal way to assert this but every file handle creates
1674
+ % % a monitor that (currenlty?) is a reference so we assert that we have
1675
+ % % fewer reference monitors after
1676
+ ? assert (NumMonRefsAfter < NumMonRefsBefore ),
1677
+
1678
+ rabbit_ct_client_helpers :close_channel (C ),
1679
+ ok .
1680
+
1632
1681
gh_12635 (Config ) ->
1633
1682
% https://github.com/rabbitmq/rabbitmq-server/issues/12635
1634
1683
[Server0 , _Server1 , Server2 ] =
@@ -4946,3 +4995,7 @@ ensure_qq_proc_dead(Config, Server, RaName) ->
4946
4995
ensure_qq_proc_dead (Config , Server , RaName )
4947
4996
end .
4948
4997
4998
+ lsof_rpc () ->
4999
+ Cmd = rabbit_misc :format (
5000
+ " lsof -p ~ts " , [os :getpid ()]),
5001
+ os :cmd (Cmd ).
0 commit comments