Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 67 additions & 67 deletions deps/rabbitmq_management_agent/src/rabbit_mgmt_gc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ handle_info(start_gc, State) ->
gc_queues(),
gc_exchanges(),
gc_nodes(),
{noreply, start_timer(State)}.
{noreply, start_timer(State), hibernate}.

terminate(_Reason, #state{timer = TRef}) ->
_ = erlang:cancel_timer(TRef),
Expand All @@ -56,12 +56,12 @@ gc_connections() ->

gc_vhosts() ->
VHosts = rabbit_vhost:list(),
GbSet = gb_sets:from_list(VHosts),
gc_entity(vhost_stats_coarse_conn_stats, GbSet),
gc_entity(vhost_stats_fine_stats, GbSet),
gc_entity(vhost_msg_stats, GbSet),
gc_entity(vhost_msg_rates, GbSet),
gc_entity(vhost_stats_deliver_stats, GbSet).
Set = sets:from_list(VHosts, [{version, 2}]),
gc_entity(vhost_stats_coarse_conn_stats, Set),
gc_entity(vhost_stats_fine_stats, Set),
gc_entity(vhost_msg_stats, Set),
gc_entity(vhost_msg_rates, Set),
gc_entity(vhost_stats_deliver_stats, Set).

gc_channels() ->
gc_process(channel_created_stats),
Expand All @@ -73,45 +73,45 @@ gc_channels() ->

gc_queues() ->
Queues = rabbit_amqqueue:list_names(),
GbSet = gb_sets:from_list(Queues),
Set = sets:from_list(Queues, [{version, 2}]),
LocalQueues = rabbit_amqqueue:list_local_names(),
LocalGbSet = gb_sets:from_list(LocalQueues),
gc_entity(queue_stats_publish, GbSet),
gc_entity(queue_stats, LocalGbSet),
gc_entity(queue_basic_stats, LocalGbSet),
gc_entity(queue_msg_stats, LocalGbSet),
gc_entity(queue_process_stats, LocalGbSet),
gc_entity(queue_msg_rates, LocalGbSet),
gc_entity(queue_stats_deliver_stats, GbSet),
gc_process_and_entity(channel_queue_stats_deliver_stats_queue_index, GbSet),
gc_process_and_entity(consumer_stats_queue_index, GbSet),
gc_process_and_entity(consumer_stats_channel_index, GbSet),
gc_process_and_entity(consumer_stats, GbSet),
gc_process_and_entity(channel_exchange_stats_fine_stats_channel_index, GbSet),
gc_process_and_entity(channel_queue_stats_deliver_stats, GbSet),
gc_process_and_entity(channel_queue_stats_deliver_stats_channel_index, GbSet),
ExchangeGbSet = gb_sets:from_list(rabbit_exchange:list_names()),
gc_entities(queue_exchange_stats_publish, GbSet, ExchangeGbSet),
gc_entities(queue_exchange_stats_publish_queue_index, GbSet, ExchangeGbSet),
gc_entities(queue_exchange_stats_publish_exchange_index, GbSet, ExchangeGbSet).
LocalSet = sets:from_list(LocalQueues, [{version, 2}]),
gc_entity(queue_stats_publish, Set),
gc_entity(queue_stats, LocalSet),
gc_entity(queue_basic_stats, LocalSet),
gc_entity(queue_msg_stats, LocalSet),
gc_entity(queue_process_stats, LocalSet),
gc_entity(queue_msg_rates, LocalSet),
gc_entity(queue_stats_deliver_stats, Set),
gc_process_and_entity(channel_queue_stats_deliver_stats_queue_index, Set),
gc_process_and_entity(consumer_stats_queue_index, Set),
gc_process_and_entity(consumer_stats_channel_index, Set),
gc_process_and_entity(consumer_stats, Set),
gc_process_and_entity(channel_exchange_stats_fine_stats_channel_index, Set),
gc_process_and_entity(channel_queue_stats_deliver_stats, Set),
gc_process_and_entity(channel_queue_stats_deliver_stats_channel_index, Set),
ExchangeSet = sets:from_list(rabbit_exchange:list_names(), [{version, 2}]),
gc_entities(queue_exchange_stats_publish, Set, ExchangeSet),
gc_entities(queue_exchange_stats_publish_queue_index, Set, ExchangeSet),
gc_entities(queue_exchange_stats_publish_exchange_index, Set, ExchangeSet).

gc_exchanges() ->
Exchanges = rabbit_exchange:list_names(),
GbSet = gb_sets:from_list(Exchanges),
gc_entity(exchange_stats_publish_in, GbSet),
gc_entity(exchange_stats_publish_out, GbSet),
gc_entity(channel_exchange_stats_fine_stats_exchange_index, GbSet),
gc_process_and_entity(channel_exchange_stats_fine_stats, GbSet).
Set = sets:from_list(Exchanges, [{version, 2}]),
gc_entity(exchange_stats_publish_in, Set),
gc_entity(exchange_stats_publish_out, Set),
gc_entity(channel_exchange_stats_fine_stats_exchange_index, Set),
gc_process_and_entity(channel_exchange_stats_fine_stats, Set).

gc_nodes() ->
Nodes = rabbit_nodes:list_members(),
GbSet = gb_sets:from_list(Nodes),
gc_entity(node_stats, GbSet),
gc_entity(node_coarse_stats, GbSet),
gc_entity(node_persister_stats, GbSet),
gc_entity(node_node_coarse_stats_node_index, GbSet),
gc_entity(node_node_stats, GbSet),
gc_entity(node_node_coarse_stats, GbSet).
Set = sets:from_list(Nodes, [{version, 2}]),
gc_entity(node_stats, Set),
gc_entity(node_coarse_stats, Set),
gc_entity(node_persister_stats, Set),
gc_entity(node_node_coarse_stats_node_index, Set),
gc_entity(node_node_stats, Set),
gc_entity(node_node_coarse_stats, Set).

gc_process(Table) ->
ets:foldl(fun({{Pid, _} = Key, _}, none) ->
Expand All @@ -133,61 +133,61 @@ gc_process(Pid, Table, Key) ->
none
end.

gc_entity(Table, GbSet) ->
gc_entity(Table, Set) ->
ets:foldl(fun({{_, Id} = Key, _}, none) when Table == node_node_stats ->
gc_entity(Id, Table, Key, GbSet);
gc_entity(Id, Table, Key, Set);
({{{_, Id}, _} = Key, _}, none) when Table == node_node_coarse_stats ->
gc_entity(Id, Table, Key, GbSet);
gc_entity(Id, Table, Key, Set);
({{Id, _} = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
gc_entity(Id, Table, Key, Set);
({Id = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
gc_entity(Id, Table, Key, Set);
({{Id, _} = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet)
gc_entity(Id, Table, Key, Set)
end, none, Table).

gc_entity(Id, Table, Key, GbSet) ->
case gb_sets:is_member(Id, GbSet) of
gc_entity(Id, Table, Key, Set) ->
case sets:is_element(Id, Set) of
true ->
none;
false ->
ets:delete(Table, Key),
none
end.

gc_process_and_entity(Table, GbSet) ->
gc_process_and_entity(Table, Set) ->
ets:foldl(fun({{Id, Pid, _} = Key, _}, none) when Table == consumer_stats ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
gc_process_and_entity(Id, Pid, Table, Key, Set);
({Id = Key, {_, Pid, _}} = Object, none)
when Table == consumer_stats_queue_index ->
gc_object(Pid, Table, Object),
gc_entity(Id, Table, Key, GbSet);
gc_entity(Id, Table, Key, Set);
({Pid = Key, {Id, _, _}} = Object, none)
when Table == consumer_stats_channel_index ->
gc_object(Id, Table, Object, GbSet),
gc_object(Id, Table, Object, Set),
gc_process(Pid, Table, Key);
({Id = Key, {{Pid, _}, _}} = Object, none)
when Table == channel_exchange_stats_fine_stats_exchange_index;
Table == channel_queue_stats_deliver_stats_queue_index ->
gc_object(Pid, Table, Object),
gc_entity(Id, Table, Key, GbSet);
gc_entity(Id, Table, Key, Set);
({Pid = Key, {{_, Id}, _}} = Object, none)
when Table == channel_exchange_stats_fine_stats_channel_index;
Table == channel_queue_stats_deliver_stats_channel_index ->
gc_object(Id, Table, Object, GbSet),
gc_object(Id, Table, Object, Set),
gc_process(Pid, Table, Key);
({{{Pid, Id}, _} = Key, _}, none)
when Table == channel_queue_stats_deliver_stats;
Table == channel_exchange_stats_fine_stats ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
gc_process_and_entity(Id, Pid, Table, Key, Set);
({{{Pid, Id}, _} = Key, _, _, _, _, _, _, _, _}, none) ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet);
gc_process_and_entity(Id, Pid, Table, Key, Set);
({{{Pid, Id}, _} = Key, _, _, _, _}, none) ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet)
gc_process_and_entity(Id, Pid, Table, Key, Set)
end, none, Table).

gc_process_and_entity(Id, Pid, Table, Key, GbSet) ->
case rabbit_misc:is_process_alive(Pid) andalso gb_sets:is_member(Id, GbSet) of
gc_process_and_entity(Id, Pid, Table, Key, Set) ->
case rabbit_misc:is_process_alive(Pid) andalso sets:is_element(Id, Set) of
true ->
none;
false ->
Expand All @@ -204,26 +204,26 @@ gc_object(Pid, Table, Object) ->
none
end.

gc_object(Id, Table, Object, GbSet) ->
case gb_sets:is_member(Id, GbSet) of
gc_object(Id, Table, Object, Set) ->
case sets:is_element(Id, Set) of
true ->
none;
false ->
ets:delete_object(Table, Object),
none
end.

gc_entities(Table, QueueGbSet, ExchangeGbSet) ->
gc_entities(Table, QueueSet, ExchangeSet) ->
ets:foldl(fun({{{Q, X}, _} = Key, _}, none)
when Table == queue_exchange_stats_publish ->
gc_entity(Q, Table, Key, QueueGbSet),
gc_entity(X, Table, Key, ExchangeGbSet);
gc_entity(Q, Table, Key, QueueSet),
gc_entity(X, Table, Key, ExchangeSet);
({Q, {{_, X}, _}} = Object, none)
when Table == queue_exchange_stats_publish_queue_index ->
gc_object(X, Table, Object, ExchangeGbSet),
gc_entity(Q, Table, Q, QueueGbSet);
gc_object(X, Table, Object, ExchangeSet),
gc_entity(Q, Table, Q, QueueSet);
({X, {{Q, _}, _}} = Object, none)
when Table == queue_exchange_stats_publish_exchange_index ->
gc_object(Q, Table, Object, QueueGbSet),
gc_entity(X, Table, X, ExchangeGbSet)
gc_object(Q, Table, Object, QueueSet),
gc_entity(X, Table, X, ExchangeSet)
end, none, Table).