Skip to content

Commit c36d47e

Browse files
feat(metrics): Add fork-safety to SynchronousMeasurementConsumer
Implement post-fork reinitialization of threading locks in the metrics measurement consumer to prevent deadlocks and data duplication in forked child processes.
1 parent 36ac612 commit c36d47e

File tree

1 file changed

+31
-0
lines changed

1 file changed

+31
-0
lines changed

opentelemetry-sdk/src/opentelemetry/sdk/metrics/_internal/measurement_consumer.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
# pylint: disable=unused-import
1616

17+
import os
1718
from abc import ABC, abstractmethod
1819
from threading import Lock
1920
from time import time_ns
@@ -76,8 +77,25 @@ def __init__(
7677
self._async_instruments: List[
7778
"opentelemetry.sdk.metrics._internal.instrument._Asynchronous"
7879
] = []
80+
if hasattr(os, "register_at_fork"):
81+
os.register_at_fork(
82+
after_in_child=self._at_fork_reinit
83+
) # pylint: disable=protected-access
84+
85+
def _at_fork_reinit(self):
86+
"""Reinitialize lock in child process after fork"""
87+
self._lock._at_fork_reinit()
88+
# Lazy reinitialization of storages on first use post fork. This is
89+
# done to avoid the overhead of reinitializing the storages on
90+
# every fork.
91+
self._needs_storage_reinit = True
92+
self._async_instruments.clear()
7993

8094
def consume_measurement(self, measurement: Measurement) -> None:
95+
if getattr(self, '_needs_storage_reinit', False):
96+
self._reinit_storages()
97+
self._needs_storage_reinit = False
98+
8199
should_sample_exemplar = (
82100
self._sdk_config.exemplar_filter.should_sample(
83101
measurement.value,
@@ -105,6 +123,11 @@ def collect(
105123
metric_reader: "opentelemetry.sdk.metrics.MetricReader",
106124
timeout_millis: float = 10_000,
107125
) -> Optional[Iterable[Metric]]:
126+
127+
if getattr(self, '_needs_storage_reinit', False):
128+
self._reinit_storages()
129+
self._needs_storage_reinit = False
130+
108131
with self._lock:
109132
metric_reader_storage = self._reader_storages[metric_reader]
110133
# for now, just use the defaults
@@ -143,3 +166,11 @@ def collect(
143166
result = self._reader_storages[metric_reader].collect()
144167

145168
return result
169+
170+
def _reinit_storages(self):
171+
# Reinitialize the storages. Use to reinitialize the storages after a
172+
# fork to avoid duplicate data points.
173+
with self._lock:
174+
for storage in self._reader_storages.values():
175+
storage._lock._at_fork_reinit()
176+
storage._instrument_view_instrument_matches.clear()

0 commit comments

Comments
 (0)