@@ -925,6 +925,227 @@ JL_DLLEXPORT int jl_alignment(size_t sz)
925925 return jl_gc_alignment (sz );
926926}
927927
928+ // Heartbeat mechanism for Julia's task scheduler
929+ // ---
930+ // Start a thread that does not participate in running Julia's tasks. This
931+ // thread simply sleeps until the heartbeat mechanism is enabled. When
932+ // enabled, the heartbeat thread enters a loop in which it blocks waiting
933+ // for the specified heartbeat interval. If, within that interval,
934+ // `jl_heartbeat()` is *not* called at least once, then the thread calls
935+ // `jl_print_task_backtraces(0)`.
936+
937+ #ifdef JL_HEARTBEAT_THREAD
938+
939+ #include <time.h>
940+
941+ volatile int heartbeat_enabled ;
942+ uv_sem_t heartbeat_on_sem , // jl_heartbeat_enable -> thread
943+ heartbeat_off_sem ; // thread -> jl_heartbeat_enable
944+ int heartbeat_interval_s ,
945+ n_loss_reports ,
946+ reset_reporting_s ;
947+ int last_report_s , report_interval_s , n_reported ;
948+ _Atomic(int ) heartbeats ;
949+
950+ JL_DLLEXPORT void jl_print_task_backtraces (int show_done ) JL_NOTSAFEPOINT ;
951+ void jl_heartbeat_threadfun (void * arg );
952+
953+ // start the heartbeat thread with heartbeats disabled
954+ void jl_init_heartbeat (void )
955+ {
956+ uv_thread_t uvtid ;
957+ heartbeat_enabled = 0 ;
958+ uv_sem_init (& heartbeat_on_sem , 0 );
959+ uv_sem_init (& heartbeat_off_sem , 0 );
960+ uv_thread_create (& uvtid , jl_heartbeat_threadfun , NULL );
961+ uv_thread_detach (& uvtid );
962+ }
963+
964+ // enable/disable heartbeats
965+ // heartbeat_s: interval within which jl_heartbeat() must be called
966+ // n_reports: for one heartbeat loss interval, how many times to report
967+ // reset_reporting_after_s: how long to wait after a heartbeat loss
968+ // interval and a return to steady heartbeats, before resetting
969+ // reporting behavior
970+ //
971+ // When disabling heartbeats, the heartbeat thread must wake up,
972+ // find out that heartbeats are now diabled, and reset. For now, we
973+ // handle this by preventing re-enabling of heartbeats until this
974+ // completes.
975+ JL_DLLEXPORT int jl_heartbeat_enable (int heartbeat_s , int n_reports ,
976+ int reset_reporting_after_s )
977+ {
978+ if (heartbeat_s <= 0 ) {
979+ heartbeat_enabled = 0 ;
980+ heartbeat_interval_s = n_loss_reports = reset_reporting_s = 0 ;
981+ }
982+ else {
983+ // must disable before enabling
984+ if (heartbeat_enabled ) {
985+ return -1 ;
986+ }
987+ // heartbeat thread must be ready
988+ if (uv_sem_trywait (& heartbeat_off_sem ) != 0 ) {
989+ return -1 ;
990+ }
991+
992+ jl_atomic_store_relaxed (& heartbeats , 0 );
993+ heartbeat_interval_s = heartbeat_s ;
994+ n_loss_reports = n_reports ;
995+ reset_reporting_s = reset_reporting_after_s ;
996+ last_report_s = 0 ;
997+ report_interval_s = heartbeat_interval_s ;
998+ heartbeat_enabled = 1 ;
999+ uv_sem_post (& heartbeat_on_sem ); // wake the heartbeat thread
1000+ }
1001+ return 0 ;
1002+ }
1003+
1004+ // heartbeat
1005+ JL_DLLEXPORT void jl_heartbeat (void )
1006+ {
1007+ jl_atomic_fetch_add (& heartbeats , 1 );
1008+ }
1009+
1010+ // sleep the thread for the specified interval
1011+ void sleep_for (int secs , int nsecs )
1012+ {
1013+ struct timespec rqtp , rmtp ;
1014+ rqtp .tv_sec = secs ;
1015+ rqtp .tv_nsec = nsecs ;
1016+ rmtp .tv_sec = 0 ;
1017+ rmtp .tv_nsec = 0 ;
1018+ for (; ;) {
1019+ // this suspends the thread so we aren't using CPU
1020+ if (nanosleep (& rqtp , & rmtp ) == 0 ) {
1021+ return ;
1022+ }
1023+ // TODO: else if (errno == EINTR)
1024+ // this could be SIGTERM and we should shutdown but how to find out?
1025+ rqtp = rmtp ;
1026+ }
1027+ }
1028+
1029+ // check for heartbeats and maybe report loss
1030+ uint8_t check_heartbeats (uint8_t gc_state )
1031+ {
1032+ int hb = jl_atomic_exchange (& heartbeats , 0 );
1033+ uint64_t curr_s = jl_hrtime () / 1e9 ;
1034+
1035+ if (hb <= 0 ) {
1036+ // we didn't get a heartbeat in the last interval; should we report?
1037+ if (n_reported < n_loss_reports &&
1038+ curr_s - last_report_s >= report_interval_s ) {
1039+ jl_task_t * ct = jl_current_task ;
1040+ jl_ptls_t ptls = ct -> ptls ;
1041+
1042+ // exit GC-safe region to report then re-enter
1043+ jl_gc_safe_leave (ptls , gc_state );
1044+ jl_safe_printf ("==== heartbeat loss ====\n" );
1045+ jl_print_task_backtraces (0 );
1046+ gc_state = jl_gc_safe_enter (ptls );
1047+
1048+ // we've reported
1049+ n_reported ++ ;
1050+
1051+ // record the reporting time _after_ the report
1052+ last_report_s = jl_hrtime () / 1e9 ;
1053+
1054+ // double the reporting interval up to a maximum
1055+ if (report_interval_s < 60 * heartbeat_interval_s ) {
1056+ report_interval_s *= 2 ;
1057+ }
1058+ }
1059+ // no heartbeats, don't change reporting state
1060+ return gc_state ;
1061+ }
1062+ else {
1063+ // we got a heartbeat; reset the report count
1064+ n_reported = 0 ;
1065+ }
1066+
1067+ // reset the reporting interval only once we're steadily getting
1068+ // heartbeats for the requested reset interval
1069+ if (curr_s - reset_reporting_s > last_report_s ) {
1070+ report_interval_s = heartbeat_interval_s ;
1071+ }
1072+
1073+ return gc_state ;
1074+ }
1075+
1076+ // heartbeat thread function
1077+ void jl_heartbeat_threadfun (void * arg )
1078+ {
1079+ int s , ns = 1e9 - 1 , rs ;
1080+ uint64_t t0 , tchb ;
1081+
1082+ // We need a TLS because backtraces are accumulated into ptls->bt_size
1083+ // and ptls->bt_data, so we need to call jl_adopt_thread().
1084+ jl_adopt_thread ();
1085+ jl_task_t * ct = jl_current_task ;
1086+ jl_ptls_t ptls = ct -> ptls ;
1087+
1088+ // Don't hold up GC, this thread doesn't participate.
1089+ uint8_t gc_state = jl_gc_safe_enter (ptls );
1090+
1091+ for (;;) {
1092+ if (!heartbeat_enabled ) {
1093+ // post the off semaphore to indicate we're ready to enable
1094+ uv_sem_post (& heartbeat_off_sem );
1095+
1096+ // sleep the thread here; this semaphore is posted in
1097+ // jl_heartbeat_enable()
1098+ uv_sem_wait (& heartbeat_on_sem );
1099+
1100+ // Set the sleep duration.
1101+ s = heartbeat_interval_s - 1 ;
1102+ ns = 1e9 - 1 ;
1103+ continue ;
1104+ }
1105+
1106+ // heartbeat is enabled; sleep, waiting for the desired interval
1107+ sleep_for (s , ns );
1108+
1109+ // if heartbeats were turned off while we were sleeping, reset
1110+ if (!heartbeat_enabled ) {
1111+ continue ;
1112+ }
1113+
1114+ // check if any heartbeats have happened, report as appropriate
1115+ t0 = jl_hrtime ();
1116+ gc_state = check_heartbeats (gc_state );
1117+ tchb = jl_hrtime () - t0 ;
1118+
1119+ // adjust the next sleep duration based on how long the heartbeat
1120+ // check took
1121+ rs = 1 ;
1122+ while (tchb > 1e9 ) {
1123+ rs ++ ;
1124+ tchb -= 1e9 ;
1125+ }
1126+ s = heartbeat_interval_s - rs ;
1127+ ns = 1e9 - tchb ;
1128+ }
1129+ }
1130+
1131+ #else // !JL_HEARTBEAT_THREAD
1132+
1133+ void jl_init_heartbeat (void )
1134+ {
1135+ }
1136+
1137+ JL_DLLEXPORT int jl_heartbeat_enable (int heartbeat_s , int n_reports ,
1138+ int reset_reporting_after_s )
1139+ {
1140+ return -1 ;
1141+ }
1142+
1143+ JL_DLLEXPORT void jl_heartbeat (void )
1144+ {
1145+ }
1146+
1147+ #endif // JL_HEARTBEAT_THREAD
1148+
9281149#ifdef __cplusplus
9291150}
9301151#endif
0 commit comments