From 5b62afadde4c215e793300a8773de030745600f5 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Fri, 26 Jan 2024 23:26:04 +0100 Subject: [PATCH 1/5] New metrics for ThreadPool - Introduced performance metrics for better monitoring and troubleshooting of ThreadPool. --- src/Common/ProfileEvents.cpp | 12 +++++++ src/Common/ThreadPool.cpp | 69 ++++++++++++++++++++++++++++++++++-- 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index d43d9fdcea8e..6bcdf9d8e91c 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -86,6 +86,18 @@ M(NetworkReceiveBytes, "Total number of bytes received from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \ M(NetworkSendBytes, "Total number of bytes send to network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \ \ + M(GlobalThreadPoolExpansions, "Counts the total number of times new threads have been added to the global thread pool. This metric indicates the frequency of expansions in the global thread pool to accommodate increased processing demands.") \ + M(GlobalThreadPoolShrinks, "Counts the total number of times the global thread pool has shrunk by removing threads. This occurs when the number of idle threads exceeds max_thread_pool_free_size, indicating adjustments in the global thread pool size in response to decreased thread utilization.") \ + M(GlobalThreadPoolThreadCreationMicroseconds, "Total time spent waiting for new threads to start.") \ + M(GlobalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the global thread pool.") \ + M(GlobalThreadPoolJobs, "Counts the number of jobs that have been pushed to the global thread pool.") \ + M(LocalThreadPoolExpansions, "Counts the total number of times threads have been borrowed from the global thread pool to expand local thread pools.") \ + M(LocalThreadPoolShrinks, "Counts the total number of times threads have been returned to the global thread pool from local thread pools.") \ + M(LocalThreadPoolThreadCreationMicroseconds, "Total time local thread pools have spent waiting to borrow a thread from the global pool.") \ + M(LocalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the local thread pools.") \ + M(LocalThreadPoolJobs, "Counts the number of jobs that have been pushed to the local thread pools.") \ + M(LocalThreadPoolBusyMicroseconds, "Total time threads have spent executing the actual work.") \ + \ M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \ M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \ M(DiskS3PutRequestThrottlerCount, "Number of DiskS3 PUT, COPY, POST and LIST requests passed through throttler.") \ diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index c8f1ae999698..d93859e1abc1 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -27,6 +28,22 @@ namespace CurrentMetrics extern const Metric GlobalThreadScheduled; } +namespace ProfileEvents +{ + extern const Event GlobalThreadPoolExpansions; + extern const Event GlobalThreadPoolShrinks; + extern const Event GlobalThreadPoolThreadCreationMicroseconds; + extern const Event GlobalThreadPoolLockWaitMicroseconds; + extern const Event GlobalThreadPoolJobs; + + extern const Event LocalThreadPoolExpansions; + extern const Event LocalThreadPoolShrinks; + extern const Event LocalThreadPoolThreadCreationMicroseconds; + extern const Event LocalThreadPoolLockWaitMicroseconds; + extern const Event LocalThreadPoolJobs; + extern const Event LocalThreadPoolBusyMicroseconds; +} + class JobWithPriority { public: @@ -180,14 +197,18 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: }; { + Stopwatch watch; std::unique_lock lock(mutex); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds, + watch.elapsedMicroseconds()); if (CannotAllocateThreadFaultInjector::injectFault()) return on_error("fault injected"); auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; }; - if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero. + if (wait_microseconds) /// Check for optional. Condition is true if the optional is set. Even if the value is zero. { if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred)) return on_error(fmt::format("no free thread (timeout={})", *wait_microseconds)); @@ -216,7 +237,13 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: try { + Stopwatch watch2; threads.front() = Thread([this, it = threads.begin()] { worker(it); }); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, + watch2.elapsedMicroseconds()); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions); } catch (...) { @@ -239,6 +266,8 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: /// Wake up a free thread to run the new job. new_job_or_shutdown.notify_one(); + ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolJobs : ProfileEvents::LocalThreadPoolJobs); + return static_cast(true); } @@ -262,7 +291,14 @@ void ThreadPoolImpl::startNewThreadsNoLock() try { + Stopwatch watch; threads.front() = Thread([this, it = threads.begin()] { worker(it); }); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolThreadCreationMicroseconds : ProfileEvents::LocalThreadPoolThreadCreationMicroseconds, + watch.elapsedMicroseconds()); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolExpansions : ProfileEvents::LocalThreadPoolExpansions); + } catch (...) { @@ -293,7 +329,11 @@ void ThreadPoolImpl::scheduleOrThrow(Job job, Priority priority, uint64_ template void ThreadPoolImpl::wait() { + Stopwatch watch; std::unique_lock lock(mutex); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds, + watch.elapsedMicroseconds()); /// Signal here just in case. /// If threads are waiting on condition variables, but there are some jobs in the queue /// then it will prevent us from deadlock. @@ -334,7 +374,11 @@ void ThreadPoolImpl::finalize() /// Wait for all currently running jobs to finish (we don't wait for all scheduled jobs here like the function wait() does). for (auto & thread : threads) + { thread.join(); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); + } threads.clear(); } @@ -391,7 +435,11 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ std::optional job_data; { + Stopwatch watch; std::unique_lock lock(mutex); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolLockWaitMicroseconds : ProfileEvents::LocalThreadPoolLockWaitMicroseconds, + watch.elapsedMicroseconds()); // Finish with previous job if any if (job_is_done) @@ -424,6 +472,8 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ { thread_it->detach(); threads.erase(thread_it); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolShrinks : ProfileEvents::LocalThreadPoolShrinks); } return; } @@ -459,7 +509,22 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ CurrentMetrics::Increment metric_active_pool_threads(metric_active_threads); - job_data->job(); + if constexpr (!std::is_same_v) + { + Stopwatch watch; + job_data->job(); + // This metric is less relevant for the global thread pool, as it would show large values (time while + // a thread was used by local pools) and increment only when local pools are destroyed. + // + // In cases where global pool threads are used directly (without a local thread pool), distinguishing + // them is difficult. + ProfileEvents::increment(ProfileEvents::LocalThreadPoolBusyMicroseconds, watch.elapsedMicroseconds()); + } + else + { + job_data->job(); + } + if (thread_trace_context.root_span.isTraceEnabled()) { From ca42ff6a095119b16751a53782f9f0f674e37a60 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Fri, 9 Feb 2024 08:14:05 +0100 Subject: [PATCH 2/5] add one more metric --- src/Common/ProfileEvents.cpp | 2 ++ src/Common/ThreadPool.cpp | 15 +++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 6bcdf9d8e91c..c09599e71723 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -91,12 +91,14 @@ M(GlobalThreadPoolThreadCreationMicroseconds, "Total time spent waiting for new threads to start.") \ M(GlobalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the global thread pool.") \ M(GlobalThreadPoolJobs, "Counts the number of jobs that have been pushed to the global thread pool.") \ + M(GlobalThreadPoolJobWaitTimeMicroseconds, "Measures the elapsed time from when a job is scheduled in the thread pool to when it is picked up for execution by a worker thread. This metric helps identify delays in job processing, indicating the responsiveness of the thread pool to new tasks.") \ M(LocalThreadPoolExpansions, "Counts the total number of times threads have been borrowed from the global thread pool to expand local thread pools.") \ M(LocalThreadPoolShrinks, "Counts the total number of times threads have been returned to the global thread pool from local thread pools.") \ M(LocalThreadPoolThreadCreationMicroseconds, "Total time local thread pools have spent waiting to borrow a thread from the global pool.") \ M(LocalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the local thread pools.") \ M(LocalThreadPoolJobs, "Counts the number of jobs that have been pushed to the local thread pools.") \ M(LocalThreadPoolBusyMicroseconds, "Total time threads have spent executing the actual work.") \ + M(LocalThreadPoolJobWaitTimeMicroseconds, "Measures the elapsed time from when a job is scheduled in the thread pool to when it is picked up for execution by a worker thread. This metric helps identify delays in job processing, indicating the responsiveness of the thread pool to new tasks.") \ \ M(DiskS3GetRequestThrottlerCount, "Number of DiskS3 GET and SELECT requests passed through throttler.") \ M(DiskS3GetRequestThrottlerSleepMicroseconds, "Total time a query was sleeping to conform DiskS3 GET and SELECT request throttling.") \ diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index d93859e1abc1..0b28b7567a75 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -35,6 +35,7 @@ namespace ProfileEvents extern const Event GlobalThreadPoolThreadCreationMicroseconds; extern const Event GlobalThreadPoolLockWaitMicroseconds; extern const Event GlobalThreadPoolJobs; + extern const Event GlobalThreadPoolJobWaitTimeMicroseconds; extern const Event LocalThreadPoolExpansions; extern const Event LocalThreadPoolShrinks; @@ -42,6 +43,8 @@ namespace ProfileEvents extern const Event LocalThreadPoolLockWaitMicroseconds; extern const Event LocalThreadPoolJobs; extern const Event LocalThreadPoolBusyMicroseconds; + extern const Event LocalThreadPoolJobWaitTimeMicroseconds; + } class JobWithPriority @@ -57,6 +60,7 @@ class JobWithPriority /// Call stacks of all jobs' schedulings leading to this one std::vector frame_pointers; bool enable_job_stack_trace = false; + Stopwatch job_create_time; JobWithPriority( Job job_, Priority priority_, CurrentMetrics::Metric metric, @@ -76,6 +80,13 @@ class JobWithPriority { return priority > rhs.priority; // Reversed for `priority_queue` max-heap to yield minimum value (i.e. highest priority) first } + + UInt64 elapsedMicroseconds() const + { + return job_create_time.elapsedMicroseconds(); + } + + }; static constexpr auto DEFAULT_THREAD_NAME = "ThreadPool"; @@ -483,6 +494,10 @@ void ThreadPoolImpl::worker(typename std::list::iterator thread_ job_data = std::move(const_cast(jobs.top())); jobs.pop(); + ProfileEvents::increment( + std::is_same_v ? ProfileEvents::GlobalThreadPoolJobWaitTimeMicroseconds : ProfileEvents::LocalThreadPoolJobWaitTimeMicroseconds, + job_data->elapsedMicroseconds()); + /// We don't run jobs after `shutdown` is set, but we have to properly dequeue all jobs and finish them. if (shutdown) { From 5995004a7dce4c63619b708d8c7bc99dc328c433 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Tue, 20 Aug 2024 22:35:32 +0200 Subject: [PATCH 3/5] Mark LocalThread metrics obsolete due to https://github.com/ClickHouse/ClickHouse/pull/47880 --- src/Common/CurrentMetrics.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 23ef44f3b323..adc074738786 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -75,9 +75,9 @@ M(GlobalThread, "Number of threads in global thread pool.") \ M(GlobalThreadActive, "Number of threads in global thread pool running a task.") \ M(GlobalThreadScheduled, "Number of queued or active jobs in global thread pool.") \ - M(LocalThread, "Number of threads in local thread pools. The threads in local thread pools are taken from the global thread pool.") \ - M(LocalThreadActive, "Number of threads in local thread pools running a task.") \ - M(LocalThreadScheduled, "Number of queued or active jobs in local thread pools.") \ + M(LocalThread, "Obsolete. Number of threads in local thread pools. The threads in local thread pools are taken from the global thread pool.") \ + M(LocalThreadActive, "Obsolete. Number of threads in local thread pools running a task.") \ + M(LocalThreadScheduled, "Obsolete. Number of queued or active jobs in local thread pools.") \ M(MergeTreeDataSelectExecutorThreads, "Number of threads in the MergeTreeDataSelectExecutor thread pool.") \ M(MergeTreeDataSelectExecutorThreadsActive, "Number of threads in the MergeTreeDataSelectExecutor thread pool running a task.") \ M(MergeTreeDataSelectExecutorThreadsScheduled, "Number of queued or active jobs in the MergeTreeDataSelectExecutor thread pool.") \ From 943f05998a8223e5a5c08b4de182e21c6992d522 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Tue, 20 Aug 2024 21:39:36 +0200 Subject: [PATCH 4/5] make jobs queue in the ThreadPool stable (i.e. FIFO for the same priority), otherwise some jobs can stay in queue untaken for a long time --- src/Common/ThreadPool.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index a31d793264e9..fd9149bda045 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -131,7 +131,7 @@ class ThreadPoolImpl bool threads_remove_themselves = true; const bool shutdown_on_exception = true; - boost::heap::priority_queue jobs; + boost::heap::priority_queue> jobs; std::list threads; std::exception_ptr first_exception; std::stack on_destroy_callbacks; From 01838c60d17db22536c84edb19e437ed1e4a3f86 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Wed, 21 Aug 2024 18:20:05 +0200 Subject: [PATCH 5/5] fix style --- src/Common/ProfileEvents.cpp | 4 ++-- src/Common/ThreadPool.cpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index c09599e71723..044f787aee98 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -89,11 +89,11 @@ M(GlobalThreadPoolExpansions, "Counts the total number of times new threads have been added to the global thread pool. This metric indicates the frequency of expansions in the global thread pool to accommodate increased processing demands.") \ M(GlobalThreadPoolShrinks, "Counts the total number of times the global thread pool has shrunk by removing threads. This occurs when the number of idle threads exceeds max_thread_pool_free_size, indicating adjustments in the global thread pool size in response to decreased thread utilization.") \ M(GlobalThreadPoolThreadCreationMicroseconds, "Total time spent waiting for new threads to start.") \ - M(GlobalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the global thread pool.") \ + M(GlobalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the global thread pool.") \ M(GlobalThreadPoolJobs, "Counts the number of jobs that have been pushed to the global thread pool.") \ M(GlobalThreadPoolJobWaitTimeMicroseconds, "Measures the elapsed time from when a job is scheduled in the thread pool to when it is picked up for execution by a worker thread. This metric helps identify delays in job processing, indicating the responsiveness of the thread pool to new tasks.") \ M(LocalThreadPoolExpansions, "Counts the total number of times threads have been borrowed from the global thread pool to expand local thread pools.") \ - M(LocalThreadPoolShrinks, "Counts the total number of times threads have been returned to the global thread pool from local thread pools.") \ + M(LocalThreadPoolShrinks, "Counts the total number of times threads have been returned to the global thread pool from local thread pools.") \ M(LocalThreadPoolThreadCreationMicroseconds, "Total time local thread pools have spent waiting to borrow a thread from the global pool.") \ M(LocalThreadPoolLockWaitMicroseconds, "Total time threads have spent waiting for locks in the local thread pools.") \ M(LocalThreadPoolJobs, "Counts the number of jobs that have been pushed to the local thread pools.") \ diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index 0b28b7567a75..8685533e2d10 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -277,7 +277,7 @@ ReturnType ThreadPoolImpl::scheduleImpl(Job job, Priority priority, std: /// Wake up a free thread to run the new job. new_job_or_shutdown.notify_one(); - ProfileEvents::increment( std::is_same_v ? ProfileEvents::GlobalThreadPoolJobs : ProfileEvents::LocalThreadPoolJobs); + ProfileEvents::increment(std::is_same_v ? ProfileEvents::GlobalThreadPoolJobs : ProfileEvents::LocalThreadPoolJobs); return static_cast(true); }