Skip to content

Conversation

@orange475
Copy link

Summary

  • Fixes a process leak in PosixCommandBasedStatisticsGetter where shell processes spawned to collect system
    statistics were not being properly reaped
  • Adds process.waitFor() call after reading command output to ensure child processes complete and are removed
    from the process table
  • Includes comprehensive unit tests to verify the fix and prevent regression

Problem

The PosixCommandBasedStatisticsGetter class executes shell commands (ps, grep) to collect memory statistics but
wasn't waiting for these processes to complete. This caused zombie processes to accumulate over time in
long-running Samza containers, potentially leading to resource exhaustion.

Solution

  • Added process.waitFor() after closing the process reader to ensure proper process cleanup
  • Implemented proper interruption handling to maintain thread interrupt status
  • Added unit tests covering normal operation, interruption scenarios, and multiple executions

Test Plan

  • Added unit tests for process reaping behavior
  • Added test for interruption handling during command execution
  • Added test for multiple command executions to verify no process accumulation
  • Run integration tests to verify no regression in statistics collection
  • Deploy to staging environment and monitor process table for zombie processes

@orange475
Copy link
Author

@xinyuiscool @shanthoosh @sborya Could you take a look at this? Thank you

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while waiting for command to complete", e);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to add:

    } finally {
        executable.destroy();
    }

here just to make sure.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, PTAL

// Wait for the process to complete to prevent resource leak
try {
executable.waitFor();
} catch (InterruptedException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we re-interrupt, we should probably try to kill it off. e.g.

    } catch (InterruptedException e) {
      executable.destroy();
      Thread.currentThread().interrupt();
      throw new IOException("Interrupted while waiting for command to complete", e);
    }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll do destroy in finally block

psOutput.add(line);
}
}
processReader.close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of explicitly calling close, this should probably use try-with-resources instead. Also, since we're already changing this, any thoughts on draining the stderr buffer here too? Although (very) unlikely, if the process spams stderr and fills the pipe, it'll deadlock on the wait until something drains it off. e.g.

    try (BufferedReader processReader = new BufferedReader(new InputStreamReader(executable.getInputStream()));
         BufferedReader errorReader = new BufferedReader(new InputStreamReader(executable.getErrorStream()))) {
        String line;
        while ((line = processReader.readLine()) != null) {
            if (!line.isEmpty()) {
                psOutput.add(line);
            }
        }
        while ((line = errorReader.readLine()) != null) {
            if (!line.isEmpty()) {
                log.error("stderr while running {}: {}", cmdArray, line);
            }
        }
    }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the stderr buffer draining.

Copy link
Contributor

@sborya sborya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@orange475
Copy link
Author

Updated

Copy link
Contributor

@bringhurst bringhurst left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Improve stderr drain performance by only record the first 100 lines
@orange475 orange475 force-pushed the fix_resource_leak branch from 0b5a014 to 7f6d456 Compare July 14, 2025 18:16

// Wait for the process to complete to prevent resource leak
try {
boolean finished = executable.waitFor(COMMAND_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will be the behavior if the executable did not complete within the timeout?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will throw an exception and finally destroy the executable. This is to avoid infinite wait. We could also remove this TTL to just call executable.waitFor() or with a longer default time. @bringhurst what's your thought on this?


// Wait for the process to complete to prevent resource leak
try {
boolean finished = executable.waitFor(COMMAND_TIMEOUT_SECONDS, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to make this configurable per job.

Copy link
Author

@orange475 orange475 Jul 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think this would be necessary at this point as we don't want to expose this to users for tuning. Make it configurable would also require a wider refactoring.

@bringhurst
Copy link
Contributor

ced9c1d

This change was discussed offline. It was updated to a minute to sync up with the metrics reporting interval. There's no need to restrict it more than that.

while ((line = processReader.readLine()) != null) {
if (!line.isEmpty()) {
psOutput.add(line);
try (BufferedReader processReader = new BufferedReader(new InputStreamReader(executable.getInputStream()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please clarify how did we have verify and test the changes end-to-end such that it addresses the leack problem?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants