Skip to content

Commit b27a63e

Browse files
authored
chore: add a helper method to create a dedicated virtual thread scheduler. (#164)
Motivation: Add a helper method to create a separate virtual thread scheduler.
1 parent 9497af4 commit b27a63e

File tree

2 files changed

+48
-2
lines changed

2 files changed

+48
-2
lines changed

cask/src/cask/internal/Util.scala

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import java.io.{InputStream, PrintWriter, StringWriter}
44
import scala.collection.generic.CanBuildFrom
55
import scala.collection.mutable
66
import java.io.OutputStream
7-
import java.lang.invoke.{MethodHandles, MethodType}
8-
import java.util.concurrent.{Executor, ExecutorService, ForkJoinPool, ThreadFactory}
7+
import java.lang.invoke.{MethodHandle, MethodHandles, MethodType}
8+
import java.util.concurrent.atomic.AtomicInteger
9+
import java.util.concurrent.{Executor, ExecutorService, ForkJoinPool, ForkJoinWorkerThread, ThreadFactory}
910
import scala.annotation.switch
11+
import scala.concurrent.duration.TimeUnit
1012
import scala.concurrent.{ExecutionContext, Future, Promise}
1113
import scala.util.Try
1214
import scala.util.control.NonFatal
@@ -69,6 +71,49 @@ object Util {
6971
}
7072
}
7173

74+
/**
75+
* A helper class to create the carrier thread for the virtual thread,
76+
* Require Java 21 or above.
77+
* */
78+
private object CarrierThreadFactory extends ForkJoinPool.ForkJoinWorkerThreadFactory {
79+
private val counter = new AtomicInteger(0)
80+
private val clazz = lookup.findClass("jdk.internal.misc.CarrierThread")
81+
private val constructor: MethodHandle = lookup.findConstructor(
82+
clazz,
83+
MethodType.methodType(classOf[Unit], classOf[ForkJoinPool]))
84+
85+
override def newThread(pool: ForkJoinPool): ForkJoinWorkerThread = {
86+
val carrierThread = constructor.invoke(pool).asInstanceOf[ForkJoinWorkerThread]
87+
// Set the name of the carrier thread
88+
carrierThread.setName("cask-carrier-thread-" + counter.incrementAndGet())
89+
carrierThread
90+
}
91+
}
92+
93+
/**
94+
* Create a dedicated forkjoin based scheduler for the virtual thread.
95+
* NOTE: you can use other threads pool as scheduler too, this method just integrated the `CarrierThreadFactory`
96+
* when creating the ForkJoinPool.
97+
* */
98+
def createForkJoinPoolBasedScheduler(parallelism: Int,
99+
corePoolSize: Int,
100+
maximumPoolSize: Int,
101+
keepAliveTime: Int,
102+
timeUnit: TimeUnit): Executor = {
103+
new ForkJoinPool(
104+
parallelism,
105+
CarrierThreadFactory,
106+
(_: Thread, _: Throwable) => {}, // ignored for carrier thread
107+
true, //FIFO
108+
corePoolSize,
109+
maximumPoolSize,
110+
parallelism / 2,
111+
(_: ForkJoinPool) => true, //which is needed for virtual thread
112+
keepAliveTime,
113+
timeUnit
114+
)
115+
}
116+
72117
/**
73118
* Create a virtual thread factory with a executor, the executor will be used as the scheduler of
74119
* virtual thread.

docs/pages/1 - Cask - a Scala HTTP micro-framework.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,7 @@ Cask can support using Virtual Threads to handle the request out of the box, you
477477
1. You can change the default scheduler of the carrier threads with `cask.internal.Util.createVirtualThreadExecutor` method, but keep in mind, that's not officially supported by JDK for now.
478478
2. You can supply your own `Executor` by override the `handlerExecutor()` method in your `cask.Main` object, which will be called only once when the server starts.
479479
3. You can use `jdk.internal.misc.Blocker`'s `begin` and `end` methods to help the `ForkJoinPool` when needed.
480+
4. You can use `Util.createVirtualThreadScheduler` to create separate `ForkJoinPool` as scheduler for the virtual threads.
480481
481482
**NOTE**:
482483

0 commit comments

Comments
 (0)