Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions gprofiler/metadata/py_module_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,11 @@ def _get_packages_dir(file_path: str) -> Optional[str]:
def _get_metadata(dist: pkg_resources.Distribution) -> Dict[str, str]:
"""Based on pip._internal.utils.get_metadata"""
metadata_name = "METADATA"
if isinstance(dist, pkg_resources.DistInfoDistribution) and dist.has_metadata(metadata_name):
# Check if this distribution supports modern METADATA format
# Some distributions have METADATA, others only have PKG-INFO
is_dist_info = hasattr(dist, "_path") or dist.__class__.__name__ in ("DistInfoDistribution", "Distribution")

if is_dist_info and dist.has_metadata(metadata_name):
metadata = dist.get_metadata(metadata_name)
elif dist.has_metadata("PKG-INFO"):
metadata_name = "PKG-INFO"
Expand Down Expand Up @@ -120,7 +124,7 @@ def _files_from_legacy(dist: pkg_resources.Distribution) -> Optional[Iterator[st
return None
paths = (p for p in text.splitlines(keepends=False) if p)
root = dist.location
info = dist.egg_info
info = getattr(dist, "egg_info", None)
if root is None or info is None:
return paths
try:
Expand Down
27 changes: 23 additions & 4 deletions gprofiler/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ def start_process(
**kwargs,
)

_processes.append(process)
return process


Expand Down Expand Up @@ -190,10 +191,28 @@ def reap_process(process: Popen) -> Tuple[int, bytes, bytes]:
(see https://docs.python.org/3/library/subprocess.html#subprocess.Popen.wait, and see
ticket https://github.com/intel/gprofiler/issues/744).
"""
stdout, stderr = process.communicate()
returncode = process.poll()
assert returncode is not None # only None if child has not terminated
return returncode, stdout, stderr
# If process is already terminated, don't try to communicate
if process.poll() is not None:
# Process already exited, just collect any remaining output
try:
stdout = process.stdout.read() if process.stdout and not process.stdout.closed else b""
stderr = process.stderr.read() if process.stderr and not process.stderr.closed else b""
except Exception:
stdout, stderr = b"", b""
return process.returncode, stdout, stderr

# Process still running, try normal communicate
try:
stdout, stderr = process.communicate()
return process.returncode, stdout, stderr
except ValueError as e:
if "flush of closed file" in str(e):
# Handle the race condition gracefully
returncode = process.wait()
stdout, stderr = b"", b""
return returncode, stdout, stderr
else:
raise


def _kill_and_reap_process(process: Popen, kill_signal: signal.Signals) -> Tuple[int, bytes, bytes]:
Expand Down
64 changes: 56 additions & 8 deletions gprofiler/utils/perf_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class PerfProcess:
# default number of pages used by "perf record" when perf_event_mlock_kb=516
# we use double for dwarf.
_MMAP_SIZES = {"fp": 129, "dwarf": 257}
_RSS_GROWTH_THRESHOLD = 100 * 1024 * 1024 # 100MB in bytes
_BASELINE_COLLECTION_COUNT = 3 # Number of function calls to collect RSS before setting baseline
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, i would suggest having some data backed to configure this collection count baseline


def __init__(
self,
Expand Down Expand Up @@ -65,6 +67,8 @@ def __init__(
self._extra_args = extra_args
self._switch_timeout_s = switch_timeout_s
self._process: Optional[Popen] = None
self._baseline_rss: Optional[int] = None
self._collected_rss_values: List[int] = []

@property
def _log_name(self) -> str:
Expand Down Expand Up @@ -132,6 +136,7 @@ def is_running(self) -> bool:

def restart(self) -> None:
self.stop()
self._clear_baseline_data()
self.start()

def restart_if_not_running(self) -> None:
Expand All @@ -145,18 +150,50 @@ def restart_if_not_running(self) -> None:
def restart_if_rss_exceeded(self) -> None:
"""Checks if perf used memory exceeds threshold, and if it does, restarts perf"""
assert self._process is not None
perf_rss = Process(self._process.pid).memory_info().rss
if (
time.monotonic() - self._start_time >= self._RESTART_AFTER_S
and perf_rss >= self._PERF_MEMORY_USAGE_THRESHOLD
):
current_rss = Process(self._process.pid).memory_info().rss

# Collect RSS readings for baseline calculation
if self._baseline_rss is None:
self._collected_rss_values.append(current_rss)

if len(self._collected_rss_values) < self._BASELINE_COLLECTION_COUNT:
return # Still collecting, don't check thresholds yet

# Calculate average from collected samples
self._baseline_rss = sum(self._collected_rss_values) // len(self._collected_rss_values)
logger.debug(
f"RSS baseline established for {self._log_name}",
collected_samples=self._collected_rss_values,
calculated_baseline=self._baseline_rss,
)

# Now check memory thresholds with established baseline
memory_growth = current_rss - self._baseline_rss
time_elapsed = time.monotonic() - self._start_time

should_restart_time_based = (
time_elapsed >= self._RESTART_AFTER_S and current_rss >= self._PERF_MEMORY_USAGE_THRESHOLD
)
should_restart_growth_based = memory_growth > self._RSS_GROWTH_THRESHOLD
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, looks good
can we get some metrics behind this, what the average growth rate of perf from initialization to post profiling for busy vs non-busy system

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The system I tested is an AWS instance with small number of cpus. Therefore, our tests do not represent your use case. On idle system I do see around 50MB as baseline RSS and 1~2MB growth every duration. I tried to run more cpu intensive workloads but it seems not increase the rss much because the system configuration has limited memory and cpus. So, it would be good if you can evaluate this change on your side with real use cases. Can you try that?

Copy link
Contributor

@prashantbytesyntax prashantbytesyntax Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, this would need testing in environments where there are lot of processes ( over 1k-1.5k) running. I would suggest we can start testing the root cause fix here #1002 and then test this fix.
@ashokbytebytego ^ ^


if should_restart_time_based or should_restart_growth_based:
restart_cause = "time+memory limits" if should_restart_time_based else "memory growth"
logger.debug(
f"Restarting {self._log_name} due to memory exceeding limit",
limit_rss=self._PERF_MEMORY_USAGE_THRESHOLD,
perf_rss=perf_rss,
f"Restarting {self._log_name} due to {restart_cause}",
current_rss=current_rss,
baseline_rss=self._baseline_rss,
memory_growth=memory_growth,
time_elapsed=time_elapsed,
threshold_limit=self._PERF_MEMORY_USAGE_THRESHOLD,
)
self._clear_baseline_data()
self.restart()

def _clear_baseline_data(self) -> None:
"""Reset baseline tracking for next process instance"""
self._baseline_rss = None
self._collected_rss_values = []

def switch_output(self) -> None:
assert self._process is not None, "profiling not started!"
# clean stale files (can be emitted by perf timing out and switching output file).
Expand Down Expand Up @@ -191,6 +228,17 @@ def wait_and_script(self) -> str:
# (unlike Popen.communicate())
if self._process is not None and self._process.stderr is not None:
logger.debug(f"{self._log_name} run output", perf_stderr=self._process.stderr.read1()) # type: ignore
# Safely drain stdout buffer without interfering with error handling
if self._process is not None and self._process.stdout is not None:
try:
# Use read1() to avoid blocking, but don't necessarily log it
stdout_data = self._process.stdout.read1() # type: ignore
# Only log if there's unexpected stdout data (diagnostic value)
if stdout_data:
logger.debug(f"{self._log_name} unexpected stdout", perf_stdout=stdout_data)
except (OSError, IOError):
# Handle case where stdout is already closed/broken
pass

try:
inject_data = Path(f"{str(perf_data)}.inject")
Expand Down