Skip to content

IvanPostu/kotlin-starter

Repository files navigation

Kotlin Starter

358

Description

A starter Kotlin project.

Prerequisites

  • IntelliJ detekt plugin
    • Enable: Settings -> Tools -> detekt
  • Disable wildcard imports: Settings -> Editor -> Code Style -> Kotlin
  • New line at the end of the file: Settings -> Editor -> General -> Ensure every saved file ends with a line break

Shell

# switch java version (available only in nix shell ./app.nix)
switch_java 8|11|17|21

jps -l # shows java processes
jstack 130201 > threaddump.txt
jmap -dump:live,format=b,file=myheapdump.hprof 130201

visualvm # part of jdk?? requires nixos package

./gradlew :app:run
./gradlew run

# automatically part of the check step
./gradlew detekt

./gradlew ktlintFormat
./gradlew ktlintCheck

# build and run
./gradlew clean build --no-daemon
./gradlew build --no-daemon
java -jar ./app/build/libs/app-1.0-SNAPSHOT-all.jar

# setup env vars
- Set env variable:
$Env:KOTLINBOOK_ENV = production # windows
export KOTLINBOOK_ENV production # linux

docker build -f Dockerfile -t kotlinapp:latest .
docker run -p 9000:8081 --name kotlinapp kotlinapp:latest
docker run -p 9000:8081 --restart=always -d --name kotlinapp kotlinapp:latest
docker rm -f kotlinapp

Info

  • To handle manually versions of JDKs on Windows, use https://github.com/shyiko/jabba

  • If the main method isn't inside a class, the Kt suffix is required, e.g., mainClass.set("com.iv127.kotlin.starter.MainKt"); where Main is the name of the source file. Otherwise, if the main method is inside the Main class, then mainClass.set("com.iv127.kotlin.starter.Main")

  • run via gradle: ./gradlew run which will use main defined in application, see build.gradle.kts

  • Platform type - type that potentially can be null, but bypasses compile-time null-check, e.g. calling java method that returns ref to an object

    • Platform types essentially mean “I'll allow you to pass this thing, which might be null, to a non-nullable type.”
    • Kotlin will fail early here as well. Kotlin adds runtime checks to all variable and property assignments where the type is non-nullable. Kotlin will throw an error on assignment, so in the end, the variable will indeed be non-nullable.
  • Elvis operator is a convenient way to specify a default value when something is null or false, e.g. var test = nullable ?: default

  • For prod env if env variable APPLICATION_HTTP_PORT is not specified, com.typesafe.config will fail

  • Nix shell with all settings and env variables; nix-shell ./app.nix --run bash

  • Prefixing application specific environment variables is a good practice

  • Data classes has built-in copy functions

  • Kotlin HOFs (high order functions):

    • also works as Stream.map, but it returns same type and value that it takes
    • let method woks like Stream.map in java (see KotlinLetUsageTest). On nullable is executed ONLY if value is present
    • fold works like reduce in js or collect in java, accumulator can be of any type
    • use for classes that implement Closeable, in the end calls close even if exception is thrown in the block
    • apply runs initialization logic after object was created
  • A lambda with receiver is a special kind of lambda where you can call methods and access properties of a receiver object directly inside the lambda, without specifying it explicitly, see com.iv127.kotlin.starter.StringExtensionFunctionTest

  • benchmarks and real-world experience have shown that in some cases, you'll get more performance out of your database if you limit the maximum allowed number of simultaneous connections to about half the maximum of what the database supports*

  • javax.sql.DataSource is an abstraction over connection pool

  • DB interaction, briefly:

    • create connection or get connection from DataSource
    • create statement
    • statement.executeSql
    • process result set
  • Connection is closeable

    • If the connection was created manually, close disconnects it.
    • If the connection was obtained from the connection pool, close returns it to the pool.
  • function reference e.g. ::migrateDataSource

  • Flyway migration name pattern V{number}__{myFileName).sql

  • Flyway repeatable migration name pattern R__{myFileName}.sql which always runs

  • blue/green deployments, meaning that when you deploy the latest version of your code, you keep the previous version up and running until the process with your new code has started up and is fully initialized.

  • It is important to make migrations backward compatible for blue-green deployments

  • Seed Data - data that web app expects to always be in the database

  • Rerunning Failed Migration

    • Make manual changes
    • To make Flyway believe a migration never ran, update the flyway_schema_ history table. Assuming the migration that failed was V5, run DELETE FROM flyway_schema_history WHERE version = 5 to make Flyway rerun the migration from scratch next time on app startup.
  • Manually Performing Migration

    • Make manual changes
    • UPDATE flyway_schema_history SET success = true WHERE version = 5 on startup, Flyway will think that the migration was already run
  • Cast from Long? to Long, e.g. return userId!!, if userId is null it will throw NPE

  • "black box tests" signifying that your tests have as little knowledge as possible about the inner workings of the system under test

  • testing hierarchy: unit, integration, system, acceptance (E2E)

  • Function Type

    • a type which defines signature of a method, e.g. val greeter: (String) -> String = { name -> "Hello, $name!" }
    • analogue of functional interface in java
  • Function type with receiver is a function definition like Application.() -> Unit, where inside the body it is possible to use this of the Application type.

  • Extension Function - invoke the function as if it were a method defined on the class itself, see com.iv127.kotlin.starter.StringExtensionFunctionTest

  • The difference between kotlin.coroutines and kotlinx. coroutines

    • kotlin.coroutines low level kotlin api intended to work with async code and its state
    • kotlinx.coroutines more rich functionality for async code built on top of kotlin.coroutines
  • Kotlin supports operator overloading, e.g. h1 { text("Hello, World!") } is the same as h1 { +"Hello, World" }

  • AWS Lambda Handlers:

    • P228 example of AWS Lambda handler
    • P241 example of AWS Lambda handler, more sophisticated example
    • P244 optimize lambda function
  • GraalVM, Kotlin/JS, and Kotlin Native

    • Pros: startup time (no java runtime load on startup)
    • Cons: java libraries such as Hikari Connection Pool can not work
  • If a lateinit var is accessed before it has been set, Kotlin will immediately throw a kotlin.UninitializedPropertyAccessException

  • Wiring up Spring Security in the servlet environment requires three things:

    • role hierarchy,
    • Spring Security web application context
    • Spring Security filter that's mapped to the servlet.
  • val q = by lazy {...expression that returns something} a convenient way to defer initialization of a field until it's first accessed

    val value by lazy(LazyThreadSafetyMode.SYNCHRONIZED) {// DEFAULT
        // only one thread will compute
        computeValue()
    }
    val value by lazy(LazyThreadSafetyMode.PUBLICATION) {
      // block can be executed many times by many threads, only first result will be cached
      computeValue()
    }
    val value by lazy(LazyThreadSafetyMode.NONE) {
      // no sync
      computeValue()
    }
  • Inline function - Kotlin compiler copies the body of the function into every location where it is called

  • Contracts are ways of telling the type system about things it cannot infer on its own, but are nevertheless true statements, see com.iv127.kotlin.starter.KotlinNotNullCustomContractTest

    • it is possible to make a contract that doesn't hold true in the implementation.
  • Sealed class, class hierarchy that is known on compile time

    • Can only be subclassed within the same file where it's declared

    • can be used for Result types or state machine implementation

    • example:

      sealed class Result
      
      data class Success(val data: String) : Result()
      data class Error(val exception: Throwable) : Result()
      object Loading : Result()
  • Destructuring is a language mechanism which allows unpacking an object into multiple variables

    data class User(val name: String, val age: Int)
    
    val user = User("Alice", 30)
    val (name, age) = user
  • when using locks to coordinate access to a variable, the same lock must be used wherever that variable is accessed

  • server vs. client flags:

    • java -server -cp yourApp.jar com.example.Main apply optimizations
    • java -client -cp yourApp.jar com.example.Main fast startup

API Usage

# main app
curl -i -X GET http://0.0.0.0:8081/ ; echo -e '\n'
curl -i -X GET http://0.0.0.0:8081/public/app.css ; echo -e '\n'
curl -i -X GET http://0.0.0.0:8081/param_test?foo=abc123 ; echo -e '\n'
curl -i -X GET http://0.0.0.0:8081/json_test ; echo -e '\n'
curl -i -X GET http://0.0.0.0:8081/json_test_with_header ; echo -e '\n'
curl -i -X GET http://0.0.0.0:8081/err ; echo -e '\n'
curl -i -X GET http://0.0.0.0:8081/db_test1 ; echo -e '\n'
curl -i -X GET http://0.0.0.0:8081/db_test2 ; echo -e '\n'
curl -i -X GET http://0.0.0.0:8081/single_user ; echo -e '\n'
curl -i -X GET http://0.0.0.0:8081/html_test ; echo -e '\n'
curl -i -X POST http://0.0.0.0:8081/test_json \
  -H "Content-Type: application/json" \
  -d '{"email": "[email protected]", "password": "test1234"}' \
  ; echo -e '\n'
curl -i -X POST http://0.0.0.0:8081/jwt_login \
  -d '{"username": "[email protected]", "password": "1234"}' \
  ; echo -e '\n' 
curl -i -X GET http://0.0.0.0:8081/jwt_secret \
  -H "Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJhdWQiOiJteUFwcCIsImlzcyI6Imh0dHA6Ly8wLjAuMC4wOjQyMDciLCJleHAiOjE3NTQ5NTMxMzEsInVzZXJJZCI6MX0.l9-27G2eeqieL5ENkOvorasuIrMZ6wDr7hWPL_NwLFo" \
  ; echo -e '\n'

# fake server
curl -i -X GET http://0.0.0.0:9876/random_number ; echo -e '\n'
curl -i -X GET http://0.0.0.0:9876/ping ; echo -e '\n'
curl -i -X POST http://0.0.0.0:9876/reverse \
  -H "Content-Type: text/plain" \
  -d "This is the raw string body" \
  ; echo -e '\n'

# jooby server
curl -i -X POST http://0.0.0.0:8081/db_test \
  -H "Content-Type: text/plain" \
  -d '{"email": "[email protected]"}' \
  ; echo -e '\n'

Observation

  • Thread per request problem: idle threads, e.g. a request thread is sleeping while it is waiting for a response from the database server or external service

    • context switch happens and this thread gets CPU time, but because it is waiting, it will be wasted
    • system call time consuming operation because java does calls to the OS core e.g. to lock shared data with synchronized block -suspend/resume thread (on synchronized block)
    • thread scheduler which performs context switch, gives CPU time even if the thread is waiting or sleeping
  • To fix the idle threads problem, Kotlin has a concept called suspension

    • suspends execution of the code at specific point
    • this thread is used for other tasks
    • when the suspended code is ready to continue running, execution will continue, potentially on another thread

Conclusion: Coroutine is a mechanism which let Kotlin runtime take control of thread management

  • suspend can be called from:

    • coroutine context
    • suspend function
  • calling .await() on Deferred<T> type suspends execution, not block

  • Coroutines shouldn't contain blocking code

  • For blocking logic such as JDBC use withContext(Dispatchers.IO) which executes blocking logic in a separate thread pool

  • kotlinx.coroutines is a library which uses kotlin.coroutines, it introduces suspend which causes compile time error if you try to call suspend function from a non-coroutine context

  • Cross origin request: When JavaScript performs a request to a different domain name, it first sends an OPTIONS request to that server. If that OPTIONS request responds with the correct CORS headers, the browser will continue and perform the actual request that was initiated. If not, an error will be raised.

  • Example of usage of login api:

    fetch("/login", {
      credentials: "include", // without this, it won't receive and send any cookies alongside your request
      method: "POST",
      headers: {
        "Content-Type": "application/x-www-form-urlencoded"
      },
      body: new URLSearchParams({
        username: ...,
        password: ...
      })
    })
  • Serverless - cloud computing model where the main unit is a handler or function, a piece that has logic which takes data, process it and returns as a result

    • (like AWS Lambda, Azure Functions, Google Cloud Functions)
    • No server management & automatic scaling
    • Limited execution time
    • initialization/execution
    • cost is only related to execution time
    • cold/warm start
  • Java optimization (disable JIT)

    • set the environment variable JAVA_TOOL_OPTIONS to -XX:+TieredCompilation -XX:TieredStopAtLevel=1, which disables tiered compilation
  • Spring Security requires the servlet specification because it is basically a servlet filter

  • Netty doesn't support the servlet specification, so because of it I am using jetty

  • Authentication management in Ktor handlers

    • best option, coroutine compatible, request.javaSecurityPrincipal
    • thread local approach SecurityContextHolder.getContext().authentication, incompatible with coroutines and async code
  • Jooby - signs cookie value but doesn't encrypt it

    • Ktor - does both, signing and encryption
  • Jooby stores the JWT in the session cookie and does not give control over how to utilize the token

  • What happens if env variable used in prod.conf is missing

    • Typesafe lib.:

      APPLICATION_ENV=prod java -jar ./app/build/libs/app-1.0-SNAPSHOT-all.jar
      Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: app-prod.conf @ jar:file:/home/iv127/Projects/kotlin-starter/app/build/libs/app-1.0-SNAPSHOT-all.jar!/app-prod.conf: 1: Could not resolve substitution to a value: ${APPLICATION_HTTP_PORT}
    • Hoplite lib.:

      Could not parse classpath:/app-prod.conf com.typesafe.config.ConfigException$UnresolvedSubstitution: Reader: 1: Could not resolve substitution to a value: ${APPLICATION_HTTP_PORT}
  • Hoplite has support for secrets through Masked type which replaces the value with ****

  • Junit5 alternatives: Spek, Kotest

  • Spek framework has intellij plugin (isn't working)

  • Java intrinsic locks aren't reentrant

TODO

Nested transaction using kotliquery

Kotliquery doesn't support nested transactions, but it can be achieved through save-points

private fun webResponseTx(
    dataSource: DataSource,
    handler: suspend PipelineContext<Unit, ApplicationCall>.(
        dbSess: TransactionalSession
    ) -> WebResponse
) = webResponseDb(dataSource) { dbSess ->
    dbSess.transaction { txSess ->
        handler(txSess)
    }
}

fun <A>dbSavePoint(dbSess: Session, body: () -> A): A {
    val sp = dbSess.connection.underlying.setSavepoint()
    return try {
        body().also {
            dbSess.connection.underlying.releaseSavepoint(sp)
        }
    } catch (e: Exception) {
        dbSess.connection.underlying.rollback(sp)
        throw e
    }
}

fun test() {
  sessionOf(dataSource).use { dbSess ->
      dbSess.transaction { txSess ->
          dbSavePoint(txSess) {
              txSess.update(queryOf("INSERT INTO ...")) // main transaction won't fail if this piece fails
          }
      }
  }
}

Access modifiers (Java vs. Kotlin)

Java Kotlin
public public (default)
protected protected
(no modifier) internal
private private

Concurrency tips

  • synchronized lock a.k.a. intrinsic lock

  • wait/notify/notifyAll - are used in Consumer/Producer logic

    • All 3 should be executed in a synchronized block with the lock on the object on which this method is called.
    • see com.iv127.kotlin.starter.concurrency_in_practice.ObjectWaitNotifyExampleTest
  • Specification - contract

  • Invariants - constraints

  • Post-condition - effect of an operation

  • Thread-safety - situation when a class continues to behave correctly when accessed from multiple threads

  • Raise condition - unpredictable result after execution of a section of code by many threads, e.g.

    • check-then-act (e.g. if file doesn't exist, create and write to it)
    • data-race(Read-modify-write ) - increment of an int
  • Nonatomic 64-bit operations - Out-of-thin-air safety - double and long, it is therefore possible to read a nonvolatile long and get back the high 32 bits of one value and the low 32 bits of another.

  • Locking and visibility - synchronize guarantees visibility of changes made by thread that entered this block to other thread

    • Only synchronized and volatile guarantees it
  • volatile can be used for flags like isInitialized or osShutdown

  • Locking can guarantee both visibility and atomicity; volatile variables can only guarantee visibility.

    • volatile is not atomic, for example fo operations where new value of a variable depends on its old value, e.g. increment (read-modify-write)
  • thread confinement - If data is only accessed from a single thread, no synchronization is needed (To make this easier, Swing provides the invokeLater mechanism to schedule a Runnable for execution in the event thread.)

    • Another example of thread confinement is jdbc connection pool
    • and ThreadLocal
    • local variables (primitives) + copy for reference types
    • Another reason to make a subsystem single-threaded (to use thread confinement) is deadlock avoidance; this is one of the primary reasons most GUI frameworks are single-threaded
  • static assignment and initialization is synchronized by the JVM

  • ConcurrentHashMap replacement for synchronized HashMap

  • ConcurrentSkipListMap replacement for synchronized TreeMap

  • ConcurrentSkipListSet replacement for synchronized TreeSet

  • CopyOnWriteArraySet replacement for synchronized Set

  • CopyOnWriteArrayList replacement for synchronized List

  • Why synchronized collections are bad?

    • HashMap.get or List.contains can be time-consuming, and will force other threads to wait during the operation
  • Concurrent collections offer higher throughput with little performance penalty for single-threaded access

  • Concurrent collection iterator doesn't throw ConcurrentModificationException, it traverses elements as they existed when the iterator was constructed, and may (but is not guaranteed to) reflect modifications to the collection after the construction of the iterator

  • Concurrent collection isEmpty and size() are not precise

  • Blocking queues provide blocking put and take methods as well as the timed equivalents offer and poll. If the queue is full, put blocks until space becomes available; if the queue is empty, take blocks until an element is available.

  • Bounded(fixed size) vs. unbounded(unlimited) blocking queues

  • consumer-producer pattern - producer threads put items into a queue and consumer threads process them

  • work stealing pattern - if a consumer exhausts the work in its own deque, it can steal work from the tail of someone else’s deque

    • producers can also be consumers
  • dequeues: ArrayDeque, LinkedBlockingDeque

  • BlockingQueueS: LinkedBlockingQueue, ArrayBlockingQueue(FIFO)

    • PriorityBlockingQueue (ordered by Comparable or Comparator, non FIFO)
    • SynchronousQueue hand off work, put must wait for a take
  • Interruption

    • Propagate the InterruptedException

    • Restore the interrupt - calling interrupt on the current thread, so that code higher up the call stack can see that an interrupt was issued

      public class TaskRunnable implements Runnable {
          BlockingQueue<Task> queue;
          public void run() {
              while (!Thread.currentThread().isInterrupted()) {
                  try {
                      Task task = queue.take();
                      processTask(task);
                  } catch (InterruptedException e) {
                      Thread.currentThread().interrupt(); // allow loop condition to become true
                  }
              }
      
              // Now we can clean up and exit gracefully
              System.out.println("Thread was interrupted. Exiting...");
          }
      }
  • Latch is an object which blocks all threads until its final state has been achieved, it works like a gate for threads, once it opens, it can't return any other state

    • CountDownLatch

      public class TestHarness {
          public long timeTasks(int nThreads, final Runnable task)
                  throws InterruptedException {
              final CountDownLatch startGate = new CountDownLatch(1);
              final CountDownLatch endGate = new CountDownLatch(nThreads);
              for (int i = 0; i < nThreads; i++) {
                  Thread t = new Thread() {
                      public void run() {
                          try {
                              startGate.await();
                              try {
                                  task.run();
                              } finally {
                                  endGate.countDown();
                              }
                          } catch (InterruptedException ignored) { }
                      }
                  };
                  t.start();
              }
              long start = System.nanoTime();
              startGate.countDown();
              endGate.await();
              long end = System.nanoTime();
              return end-start;
          }
      }
  • Preloader example

      public class Preloader {
          private final FutureTask<ProductInfo> future =
                  new FutureTask<ProductInfo>(new Callable<ProductInfo>() {
                      public ProductInfo call() throws DataLoadException {
                          return loadProductInfo();
                      }
                  });
          private final Thread thread = new Thread(future);
          public void start() { thread.start(); }
          public ProductInfo get()
                  throws DataLoadException, InterruptedException {
              try {
                  return future.get();
              } catch (ExecutionException e) {
                  Throwable cause = e.getCause();
                  if (cause instanceof DataLoadException)
                      throw (DataLoadException) cause;
                  else
                      throw launderThrowable(cause);
              }
          }
      }
    
      /** If the Throwable is an Error, throw it; if it is a
       * RuntimeException return it, otherwise throw IllegalStateException
      */
      public static RuntimeException launderThrowable(Throwable t) {
          if (t instanceof RuntimeException)
              return (RuntimeException) t;
          else if (t instanceof Error)
              throw (Error) t;
          else
              throw new IllegalStateException("Not unchecked", t);
      }
  • CyclicBarrier allows a fixed number of threads to rendezvous repeatedly at a barrier point

    • If a call to await times out or a thread blocked in await is interrupted, then the barrier is considered broken and all outstanding calls to await terminate with BrokenBarrierException.
    • barrier action a Runnable that is executed (in one of the subtask threads) when the barrier is successfully passed but before the blocked threads are released
  • Bounded hash set example, works similar to BlockingQueue

    public class BoundedHashSet<T> {
        private final Set<T> set;
        private final Semaphore sem;
        public BoundedHashSet(int bound) {
            this.set = Collections.synchronizedSet(new HashSet<T>());
            sem = new Semaphore(bound);
        }
        public boolean add(T o) throws InterruptedException {
            sem.acquire();
            boolean wasAdded = false;
            try {
                wasAdded = set.add(o);
                return wasAdded;
            }
            finally {
                if (!wasAdded)
                    sem.release();
            }
        }
        public boolean remove(Object o) {
            boolean wasRemoved = set.remove(o);
            if (wasRemoved)
                sem.release();
            return wasRemoved;
        }
    }
  • Barier is useful is situations when one step can be done in parallel but all the work associated with a given step must complete before advancing to the next step

  • Exchanger - class used to publish safely objects (buffers) between threads

  • memoization - wrapper that remembers the results of previous computations and encapsulates the caching process, good example is LazyLoadingSupplier

    public class Memoizer<A, V> implements Computable<A, V> {
        private final ConcurrentMap<A, Future<V>> cache
                = new ConcurrentHashMap<A, Future<V>>();
        private final Computable<A, V> c;
        public Memoizer(Computable<A, V> c) { this.c = c; }
        public V compute(final A arg) throws InterruptedException {
            while (true) {
                Future<V> f = cache.get(arg);
                if (f == null) {
                    Callable<V> eval = new Callable<V>() {
                        public V call() throws InterruptedException {
                            return c.compute(arg);
                        }
                    };
                    FutureTask<V> ft = new FutureTask<V>(eval);
                    f = cache.putIfAbsent(arg, ft);
                    if (f == null) { f = ft; ft.run(); }
                }
                try {
                    return f.get();
                } catch (CancellationException e) {
                    cache.remove(arg, f);
                } catch (ExecutionException e) {
                    throw launderThrowable(e.getCause());
                }
            }
        }
    }
  • Network congestion - a traffic jam, but for data which happens when too many devices or users are trying to send data through a network at the same time

jvm flags

  • -Djava.security.egd=file:/dev/./urandom" avoid delay on java startup for collecting random entropy, it should be used for dev only

  • -Xss jvm flag, stack size of a thread default value vary between 512KB and 1MB depending on java version (ignoring OS thread limitation the less this value is the more threads we can crete)

    • -Xss512k 512 KB
    • -Xss2m 2 MB
  • -XX:+DisableExplicitGC disables System.gc(), which by design doesn't guarantee anything by itself

  • -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/path/to/heapdump.hprof

  • gc logs java -Xlog:gc*:file=gc.log:time,uptime,level,tags -Xmx1G -jar myapp.jar

  • Two main factors in web applications

    • Responsiveness
    • Throughput
  • Any thread per task approach should be replaced with thread pool usage

  • graceful shutdown (finish what you’ve started but don’t accept any new work) to abrupt

  • shutdown (turn off the power to the machine room), and various points in between

  • ExecutorService methods

    public interface ExecutorService extends Executor {
        void shutdown();
        List<Runnable> shutdownNow();
        boolean isShutdown();
        boolean isTerminated();
        boolean awaitTermination(long timeout, TimeUnit unit)
                throws InterruptedException;
    
    // ... additional convenience methods for task submission
    }
    • ExecutorService has three states—running, shutting down, and terminated
  • shutdown - gracefully shutdown, executes all tasks in queue, doesn't accept new ones

  • shutdownNow initiates an abrupt shut-down: it attempts to cancel outstanding tasks and does not start any tasks that are queued but not begun

  • It is common to follow shutdown immediately by awaitTermination, creating the effect of synchronously shutting down the ExecutorService

  • ScheduledThreadPoolExecutor is better than Timer because Timer uses one thread and if timer task takes too long, the accuracy suffers

  • DelayQueue impl of BlockingQueue, a Delayed has an associated delay time; elements in a DelayQueue can only be taken once their delay expires. The queue returns elements in order of their delay expiration time.

  • Example of bad usage of java.util.Timer

    public class OutOfTime {
        public static void main(String[] args) throws Exception {
            Timer timer = new Timer();
            timer.schedule(new ThrowTask(), 1);
            SECONDS.sleep(1);
            timer.schedule(new ThrowTask(), 1);
            SECONDS.sleep(5);
        }
        static class ThrowTask extends TimerTask {
            public void run() { throw new RuntimeException(); }
        }
    }
    //it terminates after one second with an IllegalStateException  whose message text is “Timer already cancelled”.
  • Task:

    • Runnable void run();
    • Callable V call() throws Exception;
    • 4 states created, submitted, started, and completed
    • Future represents the lifecycle of a task and provides methods to test whether the task has completed or been cancelled, retrieve its result, and cancel the task
    • Once a task is completed, it stays in that state forever.
    public interface Callable<V> {
        V call() throws Exception;
    }
    public interface Future<V> {
        boolean cancel(boolean mayInterruptIfRunning);
        boolean isCancelled();
        boolean isDone();
        V get() throws InterruptedException, ExecutionException,
                CancellationException;
        V get(long timeout, TimeUnit unit)
                throws InterruptedException, ExecutionException,
                CancellationException, TimeoutException;
    }
  • If the task completes by throwing an exception, get rethrows it wrapped in an ExecutionException

    • the underlying exception can be retrieved with getCause
  • if it was cancelled, get throws CancellationException

  • coordination overhead overhead gotten on spliting problem to tasks for executor and joining results, e.g. parallelStream

  • CompletionService combines the functionality of an Executor and a BlockingQueue. Callable tasks can be submitted to it for execution and use the queue- like methods take and poll to retrieve completed results, packaged as Futures, as they become available.

    • usage:
    public class Renderer {
        private final ExecutorService executor;
        Renderer(ExecutorService executor) { this.executor = executor; }
        void renderPage(CharSequence source) {
            final List<ImageInfo> info = scanForImageInfo(source);
            CompletionService<ImageData> completionService =
                    new ExecutorCompletionService<ImageData>(executor);
            for (final ImageInfo imageInfo : info)
                completionService.submit(new Callable<ImageData>() {
                    public ImageData call() {
                        return imageInfo.downloadImage();
                    }
                });
            renderText(source);
            try {
                for (int t = 0, n = info.size(); t < n; t++) {
                    Future<ImageData> f = completionService.take();
                    ImageData imageData = f.get();
                    renderImage(imageData);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e) {
                throw launderThrowable(e.getCause());
            }
        }
    }
    // Using CompletionService to render page elements as they become available
    ///Fetching an advertisement with a time budget
    Page renderPageWithAd() throws InterruptedException {
        long endNanos = System.nanoTime() + TIME_BUDGET;
        Future<Ad> f = exec.submit(new FetchAdTask());
        // Render the page while waiting for the ad
        Page page = renderPageBody();
        Ad ad;
        try {
            // Only wait for the remaining time budget
            long timeLeft = endNanos - System.nanoTime();
            ad = f.get(timeLeft, NANOSECONDS);
        } catch (ExecutionException e) {
            ad = DEFAULT_AD;
        } catch (TimeoutException e) {
            ad = DEFAULT_AD;
            f.cancel(true);
        }
        page.setAd(ad);
        return page;
    }
    // usage of invokeALl with timeout handling
    private class QuoteTask implements Callable<TravelQuote> {
        private final TravelCompany company;
        private final TravelInfo travelInfo;
        ...
        public TravelQuote call() throws Exception {
            return company.solicitQuote(travelInfo);
        }
    }
    public List<TravelQuote> getRankedTravelQuotes(
            TravelInfo travelInfo, Set<TravelCompany> companies,
            Comparator<TravelQuote> ranking, long time, TimeUnit unit) throws InterruptedException {
        List<QuoteTask> tasks = new ArrayList<QuoteTask>();
        for (TravelCompany company : companies)
            tasks.add(new QuoteTask(company, travelInfo));
        List<Future<TravelQuote>> futures =
                exec.invokeAll(tasks, time, unit);
        List<TravelQuote> quotes =
                new ArrayList<TravelQuote>(tasks.size());
        Iterator<QuoteTask> taskIter = tasks.iterator();
        for (Future<TravelQuote> f : futures) {
            QuoteTask task = taskIter.next();
            try {
                quotes.add(f.get());
            } catch (ExecutionException e) {
                quotes.add(task.getFailureQuote(e.getCause()));
            } catch (CancellationException e) {
                quotes.add(task.getTimeoutQuote(e));
            }
        }
        Collections.sort(quotes, ranking);
        return quotes;
    }
  • The deprecated Thread.stop and suspend shouldn't be used, see oracle doc reference

  • cancellation policy an additional logic for tasks or threads that handles cancel logic

    @ThreadSafe
    public class PrimeGenerator implements Runnable {
        @GuardedBy("this")
        private final List<BigInteger> primes
                = new ArrayList<BigInteger>();
        private volatile boolean cancelled;
        public void run() {
            BigInteger p = BigInteger.ONE;
            while (!cancelled) {
                p = p.nextProbablePrime();
                synchronized (this) {
                    primes.add(p);
                }
            }
        }
        public void cancel() { cancelled = true; }
        public synchronized List<BigInteger> get() {
            return new ArrayList<BigInteger>(primes);
        }
    }
    
    List<BigInteger> aSecondOfPrimes() throws InterruptedException {
        PrimeGenerator generator = new PrimeGenerator();
        new Thread(generator).start();
        try {
            SECONDS.sleep(1);
        } finally {
            generator.cancel();
        }
        return generator.get();
    }
  • Interruption

    public class Thread {
        public void interrupt() { ... } // interrupts
        public boolean isInterrupted() { ... }
    
        // returns true if current thread was interrupted AND resets interrupted flag to false, or false is it wasn't interrupted
        public static boolean interrupted() { ... }
    }
  • cancellation points a point of logic where thread checks whether interrupted flag is true and stops itself

    class PrimeProducer extends Thread {
        private final BlockingQueue<BigInteger> queue;
        PrimeProducer(BlockingQueue<BigInteger> queue) {
            this.queue = queue;
        }
        public void run() {
            try {
                BigInteger p = BigInteger.ONE;
                while (!Thread.currentThread().isInterrupted())
                    queue.put(p = p.nextProbablePrime());
            } catch (InterruptedException consumed) {
                /* Allow thread to exit */
            }
        }
        public void cancel() { interrupt(); }
    }
  • When a thread catches an InterruptedException, the interrupt status of the thread is cleared automatically.

    public Task getNextTask(BlockingQueue<Task> queue) {
        boolean interrupted = false;
        try {
            while (true) {
                try {
                    return queue.take();
                } catch (InterruptedException e) {
                    interrupted = true;
                    // fall through and retry
                }
            }
        } finally {
            if (interrupted)
                Thread.currentThread().interrupt();
        }
    }
  • Future.cancel(boolean mayInterruptIfRunning);

    • Setting this argument to false means “don’t run this task if it hasn’t started yet”
    • true - triggers interrupt, and returns a value indicating whether the cancellation attempt was successful. (This tells you only whether it was able to deliver the interruption, not whether the task detected and acted on it.
  • Canceling a task using future

    public static void timedRun(Runnable r,
    long timeout, TimeUnit unit) throws InterruptedException {
        Future<?> task = taskExec.submit(r);
        try {
            task.get(timeout, unit);
        } catch (TimeoutException e) {
            // task will be cancelled below
        } catch (ExecutionException e) {
            // exception thrown in task; rethrow
            throw launderThrowable(e.getCause());
        } finally {
            // Harmless if task already completed
            task.cancel(true); // interrupt if running
        }
    }
  • ExecutorService.shutdownNow (a.k.a. abrupt shutdown) returns the list of tasks that had not yet started

  • shutdown awaits for all tasks to finish, ignore new ones

  • awaitTermination(timeout, unit); blocks current thread until executor finishes all tasks, e.g. of usage

    boolean checkMail(Set<String> hosts, long timeout, TimeUnit unit)
        throws InterruptedException {
        ExecutorService exec = Executors.newCachedThreadPool();
        final AtomicBoolean hasNewMail = new AtomicBoolean(false);
        try {
            for (final String host : hosts)
                exec.execute(new Runnable() {
                    public void run() {
                        if (checkMail(host))
                            hasNewMail.set(true);
                    }
                });
        } finally {
            exec.shutdown();
            exec.awaitTermination(timeout, unit);
        }
        return hasNewMail.get();
    }
  • Tracking executor service that keep track of canceled tasks after shutdown, the use case is to save its state so it can be restarted later

    public class TrackingExecutor extends AbstractExecutorService {
        private final ExecutorService exec;
        private final Set<Runnable> tasksCancelledAtShutdown =
                Collections.synchronizedSet(new HashSet<Runnable>());
        // ...
        public List<Runnable> getCancelledTasks() {
            if (!exec.isTerminated())
                throw new IllegalStateException(...);
            return new ArrayList<Runnable>(tasksCancelledAtShutdown);
        }
        public void execute(final Runnable runnable) {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        runnable.run();
                    } finally {
                        if (isShutdown()
                                && Thread.currentThread().isInterrupted())
                            tasksCancelledAtShutdown.add(runnable);
                    }
                }
            });
        }
    // delegate other ExecutorService methods to exec
    }
  • idempotent (if performing a task twice has the same effect as performing it once)

  • Thread leakage a case when thread throws RuntimeException which is not caught and thread is terminated

    • it is just printed to the consiole, like e.printStackTrace()
  • How thread pools solve this problem:

    public void run() {
        Throwable thrown = null;
        try {
            while (!isInterrupted())
                runTask(getTaskFromWorkQueue());
        } catch (Throwable e) {
            thrown = e;
        } finally {
            threadExited(this, thrown);
        }
    }
  • Another approach is UncaughtExceptionHandler

    • global Thread.setDefaultUncaughtExceptionHandler
    • specific thread t1.setUncaughtExceptionHandler
    • for thread pools can be set through ThreadFactory
    • if no handler exists, the default behavior is to print the stack trace to System.err
  • Another approach is to override afterExecute hook in ThreadPoolExecutor

  • JVM shutdown

    • orderly SIGINT (Ctrl+C) or kill -SIGINT 12345 or kill -2 12345
    • abrupt SIGKILL kill -9 12345
    • Runtime.addShutdownHook adds un-started thread for shutdown
    • no guarantee on the order in which they are executed
    • abrupt halts the JVM, ignoring shutdown hooks
  • Daemon threads

    • JVM exits without waiting for them to finish
    • May not call finally block
    • May not close resource
    • Not good for IO
    • Good for local cache cleanup
  • Java finalizers runs in JVM managed thread on garbage collect

    • can be used when the object manages resources hold by native methods
    • The best approach is not to use
    • use Closeable with try-with-resource
  • Java cant stop a thread, but it provides a cooperative interruption mechanism

  • Runtime.availableProcessors is the main factor in determining pool size

  • When a ThreadPoolExecutor is initially created, the core threads are not started immediately but instead as tasks are submitted, unless is called prestartAllCoreThreads

  • newCachedThreadPool factory sets the maximum pool size to Integer.MAX_VALUE and the core pool size to zero with a timeout of one minute, uses SynchronousQueue

  • newFixedThreadPool factory sets both the core pool size and the maximum pool size to the requested pool size, uses LinkedBlockingQueue

  • A large queue coupled with a small pool can help reduce memory usage, CPU usage, and context switching, at the cost of potentially constraining throughput

  • saturation policy - When a bounded work queue fills up, the saturation policy comes into play

    • setRejectedExecutionHandler
    • defaults: AbortPolicy, CallerRunsPolicy, DiscardPolicy, and DiscardOldestPolicy
      • abort, causes execute to throw the unchecked Rejected- ExecutionException; the caller can catch this exception and implement its own overflow handling as it sees fit
      • caller run policy means execute on pool makes current thread to execute the task
  • unconfigurableExecutorService decorator that exposes only the methods of ExecutorService so it cannot be further configured

    • newSingleThreadExecutor is already wrapped in this manner
  • Any ExecutorService created through Executors.new... can be cast to ThreadPoolExecutor except newSingleThreadExecutor

  • ThreadPoolExecutor can be extended

    • beforeExecute - executed before the task, if throws exception task is not executed as well as afterExecute
    • afterExecute - is executed only if task finished normally, or Exception is not executed
  • Thread pool extended with logging and timing

    public class TimingThreadPool extends ThreadPoolExecutor {
        private final ThreadLocal<Long> startTime
                = new ThreadLocal<Long>();
        private final Logger log = Logger.getLogger("TimingThreadPool");
        private final AtomicLong numTasks = new AtomicLong();
        private final AtomicLong totalTime = new AtomicLong();
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            log.fine(String.format("Thread %s: start %s", t, r));
            startTime.set(System.nanoTime());
        }
        protected void afterExecute(Runnable r, Throwable t) {
            try {
                long endTime = System.nanoTime();
                long taskTime = endTime - startTime.get();
                numTasks.incrementAndGet();
                totalTime.addAndGet(taskTime);
                log.fine(String.format("Thread %s: end %s, time=%dns",
                        t, r, taskTime));
            } finally {
                super.afterExecute(r, t);
            }
        }
        protected void terminated() {
            try {
                log.info(String.format("Terminated: avg time=%dns",
                        totalTime.get() / numTasks.get()));
            } finally {
                super.terminated();
            }
        }
    }
  • Concurrent puzzle solver

    public class ConcurrentPuzzleSolver<P, M> {
        private final Puzzle<P, M> puzzle;
        private final ExecutorService exec;
        private final ConcurrentMap<P, Boolean> seen;
        final ValueLatch<Node<P, M>> solution
                = new ValueLatch<Node<P, M>>();
            // ...
        public List<M> solve() throws InterruptedException {
            try {
                P p = puzzle.initialPosition();
                exec.execute(newTask(p, null, null));
            // block until solution found
                Node<P, M> solnNode = solution.getValue();
                return (solnNode == null) ? null : solnNode.asMoveList();
            } finally {
                exec.shutdown();
            }
        }
        protected Runnable newTask(P p, M m, Node<P,M> n) {
            return new SolverTask(p, m, n);
        }
        class SolverTask extends Node<P, M> implements Runnable {
            // ...
            public void run() {
                if (solution.isSet()
                        || seen.putIfAbsent(pos, true) != null)
                    return; // already solved or seen this position
                if (puzzle.isGoal(pos))
                    solution.setValue(this);
                else
                    for (M m : puzzle.legalMoves(pos))
                        exec.execute(
                                newTask(puzzle.move(pos, m), m, this));
            }
        }
    }
    // ---
    @ThreadSafe
    public class ValueLatch<T> {
        @GuardedBy("this") private T value = null;
        private final CountDownLatch done = new CountDownLatch(1);
        public boolean isSet() {
            return (done.getCount() == 0);
        }
        public synchronized void setValue(T newValue) {
            if (!isSet()) {
                value = newValue;
                done.countDown();
            }
        }
        public T getValue() throws InterruptedException {
            done.await();
            synchronized (this) {
                return value;
            }
        }
    }

GUI

  • The Swing single-thread rule: Swing components and models should be created, modified, and queried only from the event-dispatching thread

  • The Swing event thread can be thought of as a single-threaded Executor

Threads

  • daemon thread - housekeeping thread which is not terminated when JVM exits

    • use case cache cleaning
  • versioned data model - e.g. CopyOnWriteArrayList iterator traverses the collection as it existed when the iterator was created

  • some native libraries require that all access to the library, even loading the library with System.loadLibrary, be made from the same thread

    • single-threaded executor for accessing the native library, and provide a proxy object that intercepts calls to the thread-confined object and submits them as tasks to the dedicated thread. Future and newSingleThreadExecutor work together to make this easy

Liveness hazard - deadlock

  • resource deadlock

  • thread-starvation deadlock

  • Solutions:

    • Lock order

      private static final Object tieLock = new Object();
      public void transferMoney(final Account fromAcct,
      final Account toAcct,
      final DollarAmount amount)
          throws InsufficientFundsException {
          class Helper {
              public void transfer() throws InsufficientFundsException {
                  if (fromAcct.getBalance().compareTo(amount) < 0)
                      throw new InsufficientFundsException();
                  else {
                      fromAcct.debit(amount);
                      toAcct.credit(amount);
                  }
              }
          }
          int fromHash = System.identityHashCode(fromAcct);
          int toHash = System.identityHashCode(toAcct);
          if (fromHash < toHash) {
              synchronized (fromAcct) {
                  synchronized (toAcct) {
                      new Helper().transfer();
                  }
              }
          } else if (fromHash > toHash) {
              synchronized (toAcct) {
                  synchronized (fromAcct) {
                      new Helper().transfer();
                  }
              }
          } else {
              synchronized (tieLock) {
                  synchronized (fromAcct) {
                      synchronized (toAcct) {
                          new Helper().transfer();
                      }
                  }
              }
          }
      }
  • Thread dumps include information about threads and locks that they hold

    • JVM generates thread dump on SIGQUIT signal kill -3
  • thread priority is not recommended because it introduces platform specific logic

    • it is better to use Thread.sleep or Thread.yield calls in odd places, in an attempt to give more time to lower-priority threads
      • Thread.yield is a signal to runtime environment to trigger context switch, can be no-op since [JLS 17.9]
      • using a short but nonzero Thread.sleep would be slower but more reliable
  • Starvation a case when threads compete for CPU time, e.g. GUI event thread gets less CPU time due to heavy background task. As a result GUI freezes

  • Livelock a case when the message is taken from the queue and attempt to process it fails with retryable error which moves this task at the head of the queue and the process repeats infinitely without progress

    • another example is optimistic lock that retries constantly
  • Amdahl's Law is a formula used to predict the theoretical maximum speedup of a computational task when only part of the task can be parallelized, while the rest must be executed serially.

  • synchronized block potentially causes the JVM to perform the context switch

  • non-blocking approaches that are used for example in AtomicInteger solves this problem and use their full scheduling quantum

  • Lock contention - many threads try to acquire the same lock and they wait for it transforming parallel program to sequential

  • There are three ways to reduce lock contention:

    • Reduce the duration for which locks are held;
    • Reduce the frequency with which locks are requested; or
    • Replace exclusive locks with coordination mechanisms that permit greater concurrency.
  • Lock striping a case when a big data structure is synchronized on each portion by separate locks, allowing multiple threads to work with it in parallel, e.g. ConcurrentHashNap with lock per bucket

  • java.util.concurrent.locks.ReadWriteLock enforces a multiple-reader, single-writer lock- ing discipline: more than one reader can access the shared resource concurrently so long as none of them wants to modify it, but writers must acquire the lock exclusively

  • vmstat and mpstat on Unix are used for testing CPU

  • iostat or perfmon for IO

Comparing scalability of Map implementations

  • scheduling quanta a period of time allocated by thread scheduler for a thread to execute its instructions
  • saturation jobs generated by producers can't be processed by consumers due to big number

Performance can be measured by:

  • Throughput: the rate at which a set of concurrent tasks is completed;
  • Responsiveness: the delay between a request for and completion of some action (also called latency); or
  • Scalability: the improvement in throughput (or lack thereof) as more resources (usually CPUs) are made available.

Other metrics:

  • variance of service time - What percentage of operations will succeed in under 100 milliseconds?

    • e.g. Semaphore with fairness=true has higher variance with the cost of lower throughput
    • on fairness=true which is default, variance is lower, throughput is higher
  • Thread.getState should never be used in logic because even if it is in the WAITING or TIMED_WAITING state may temporarily transition to RUNNABLE even if the condition for which it is waiting is not yet true

    • It should be used only for debugging purposes

Testing concurrency

Medium-quality random number generator suitable for testing (Marsaglia, 2003)

static int xorShift(int y) {
    y ^= (y << 6);
    y ^= (y >>> 21);
    y ^= (y << 7);
    return y;
}

Java RNG (random number generation) most random number generator classes are threadsafe and therefore introduce additional synchronization, in other words they are bad candidates to be used in tests

  • contention - a bottleneck introduced by too much synchronization (often by synchronizing things that do not require it)

Why creating threads in a loop for tests or calculating performance is BAD idea

  • Expensive operation, when T2 is started, T1 will have already done a lot of work

Solution

  • CyclicBarrier
  • CountDownLatch
  • ThreadPoolExecutor + prestartAllCoreThreads()

HotSpot can be instructed to ignore System.gc calls with -XX:+DisableExplicitGC

How to detect

Find memory leaks in java / play with *.hprof (heap dumps)

  • to be cautious because it can contain sensitive information, e.g. passwords

  • see / run com.iv127.kotlin.starter.concurrency_in_practice.MemoryLeakTest

  • run jmap -dump:live,format=b,file=myheapdump.hprof PID

  • open and import in eclipse-mat (Eclipse Memory Analyzer)

  • see reference https://www.youtube.com/watch?v=t_-WyfS9a7k

  • Retained Heap means how much memory will be freed if this instance is garbage collected

    • Show Retaimed Set

Performance test

  • Extended version of functionality testing

  • see com.iv127.kotlin.starter.concurrency_in_practice.BoundedBufferTest#testPutTakeConcurrently

  • A problem that affects perf. tests is GC, solutions:

    • disable it during the test
    • adjust test to run long enough in order to make GC execution to represent a small fraction of the total run time.
  • Another problem is JIT (Just In Time compilation)

    • adjust test to run long enough in order to make JIT execution to represent a small fraction of the total run time.
    • -XX:+PrintCompilation shows JIT compilation in console
  • Dead code elimination

    • tricking the optimizer into not optimizing away your benchmark as dead code
      // a trich which fools java runtime optimier
      if (foo.x.hashCode() == System.nanoTime())
              System.out.print(" "); // 
    • (The print method buffers output until println is called, so in the rare case that hashCode and System.nanoTime are equal no I/O is actually performed.)

JIT

  • JIT based on exceeding of threashold of executing the same section with the same input parameters
  • JIT based on profiling information
  • The JVM may make optimizations based on assumptions that may only be true temporarily, and later back them out by invalidating the compiled code if they become untrue

AOP

Can be useful to detect non-compliance with the thread confinment, e.g."(Laddad, 2003) provides an example of using an aspect to wrap all calls to non-thread-safe Swing methods with the assertion that the call is occurring in the event thread"

Profilers and monitoring tools

  • JMX Agent provides some limited features for monitoring thread behavior

Explicit Locks

  • Lock interface
  • ReentrantLock
    • the most importand advantage is the possibility to interrupt while waiting for lock acquiring
    • reentrancy possibility to enable fairness (only good when needed + for long running tasks)
    • possibility to implement optimistic lock boolean tryLock()
    • possibility to wait for lock boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

Lock must be released in finally block, otherwise, the lock would never be released in case of exception

Lock lock = new ReentrantLock();
//...
lock.lock();
try {
// update object state
// catch exceptions and restore invariants if necessary
} finally {
    lock.unlock();
}
  • Lock also supports optimistic lock

    public boolean transferMoney(Account fromAcct,
            Account toAcct,
            DollarAmount amount,
            long timeout,
            TimeUnit unit) throws InsufficientFundsException, InterruptedException {
        long fixedDelay = getFixedDelayComponentNanos(timeout, unit);
        long randMod = getRandomDelayModulusNanos(timeout, unit);
        long stopTime = System.nanoTime() + unit.toNanos(timeout);
        while (true) {
            if (fromAcct.lock.tryLock()) {
                try {
                    if (toAcct.lock.tryLock()) {
                        try {
                            if (fromAcct.getBalance().compareTo(amount)
                                    < 0)
                                throw new InsufficientFundsException();
                            else {
                                fromAcct.debit(amount);
                                toAcct.credit(amount);
                                return true;
                            }
                        } finally {
                            toAcct.lock.unlock();
                        }
                    }
                } finally {
                    fromAcct.lock.unlock();
                }
            }
            if (System.nanoTime() < stopTime)
                return false;
            NANOSECONDS.sleep(fixedDelay + rnd.nextLong() % randMod);
        }
    }
  • ReadWriteLock interface

  • ReentrantReadWriteLock

    • supports fairness
    • if writer waits for its lock, no more readers are allowed to acquire read lock until writer is served
    public class ReadWriteMap<K,V> {
        private final Map<K,V> map;
        private final ReadWriteLock lock = new ReentrantReadWriteLock();
        private final Lock r = lock.readLock();
        private final Lock w = lock.writeLock();
        public ReadWriteMap(Map<K,V> map) {
            this.map = map;
        }
        public V put(K key, V value) {
            w.lock();
            try {
                return map.put(key, value);
            } finally {
                w.unlock();
            }
        }
        // Do the same for remove(), putAll(), clear()
        public V get(Object key) {
            r.lock();
            try {
                return map.get(key);
            } finally {
                r.unlock();
            }
        }
        // Do the same for other read-only Map methods
    }
  • Object.wait|notify|notifyAll a.k.a. intrinsic condition queues

    • wait - "I want to go to sleep, but wake me when something interesting happens"

    • notify|notifyAll - "something interesting happened"

    • on wait the current lock is released, and on notify, lock is reacquired by the thread similar to entering synchronized block

    • wait can be released even without calling notify by any other thread, so it should be running like this

      while(condition) {
        wait();
      }
      void stateDependentMethod() throws InterruptedException {
          // condition predicate must be guarded by lock
          synchronized(lock) {
              while (!conditionPredicate())
                  lock.wait();
          // object is now in desired state
          }
      }
  • There is a problem called missed signals, example with kithen with multiple devices that use a single bell

    • notify should never be used, due to error prone
    • notifyAll should be used!
      • but notifyAll causes all threads to awake and perform lock acquisition and context switch which is very expensive in terms of performance, an alternative is Lock.newCondition

The equivalents of wait, notify, and notifyAll for Condition objects are await, signal, and signalAll.

  • AbstractQueuedSynchronizer (AQS) is a framework that java uses to implement ReentrantLock, CountDownLatch, Semaphore, ReentrantReadWriteLock, SynchronousQueue and FutureTask
    • and many other classes from java.util.concurrent

    • ReentrantReadWriteLock implementation uses AQS, operations on the read lock use the shared acquire and release methods; operations on the write lock use the exclusive acquire and release methods.

Atomic variable and non-blocking synchronization

  • compare-and-swap low-level atomic machine instruction used in Atomic* classes

    • for processors that doesn't have it operation java uses 2 instructions local-linked, store-conditional
    • CAS: memory:V,old:A,new:B - set V=B only if oldV=A else noOp
    • these low-level primitives are exposed through the atomic variable classes
  • pessimistic technique acquire and proceed

  • optimistic approach relies on collision detection, if during the operation data was changet, operation is retried

  • Atomic* doesn't have support four Float and Double, so floatToIntBits and doubleToLongBits is used

  • Atomic(Integer|Long|Reference)Array

    • allows atomic operation on array elements

Usage of AtomicReference

private final AtomicReference<IntPair> values =
        new AtomicReference<IntPair>(new IntPair(0, 0));
public int getLower() { return values.get().lower; }
public int getUpper() { return values.get().upper; }
public void setLower(int i) {
    while (true) {
        IntPair oldv = values.get();
        if (i > oldv.upper)
            throw new IllegalArgumentException(
                    "Can’t set lower to " + i + " > upper");
        IntPair newv = new IntPair(i, oldv.upper);
        if (values.compareAndSet(oldv, newv))
            return;
    }
}

pseudo-random number generator (PRNG)

  • the next “random” number is a deterministic function of the previous number, so a PRNG must remember the previous number as part of its state
@ThreadSafe
public class AtomicPseudoRandom extends PseudoRandom {
    private AtomicInteger seed;
    AtomicPseudoRandom(int seed) {
        this.seed = new AtomicInteger(seed);
    }
    public int nextInt(int n) {
        while (true) {
            int s = seed.get();
            int nextSeed = calculateNext(s);
            if (seed.compareAndSet(s, nextSeed)) {
                int remainder = s % n;
                return remainder > 0 ? remainder : remainder + n;
            }
        }
    }
}

Replacement of synchronize whith atomic

@ThreadSafe
public class ConcurrentStack <E> {
    AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();
    public void push(E item) {
        Node<E> newHead = new Node<E>(item);
        Node<E> oldHead;
        do {
            oldHead = top.get();
            newHead.next = oldHead;
        } while (!top.compareAndSet(oldHead, newHead));
    }
    public E pop() {
        Node<E> oldHead;
        Node<E> newHead;
        do {
            oldHead = top.get();
            if (oldHead == null)
                return null;
            newHead = oldHead.next;
        } while (!top.compareAndSet(oldHead, newHead));
        return oldHead.item;
    }
    private static class Node <E> {
        public final E item;
        public Node<E> next;
        public Node(E item) {
            this.item = item;
        }
    }
}

The ABA problem

  • V from A to B and then back to A is not visible
    • solutions:
      • AtomicStampedReference conditional update on pairs of value and versioned field that is immune to ABA problem
      • AtomicMarkableReference

Java Memory Model

The Java Memory Model is specified in terms of actions, which include reads and writes to variables, locks and unlocks of monitors, and starting and joining with threads. The JMM defines a partial ordering2 called happens-before on all actions within the program. To guarantee that the thread executing action B can see the results of action A (whether or not A and B occur in different threads), there must be a happens-before relationship between A and B. In the absence of a happens-before ordering between two operations, the JVM is free to reorder them as it pleases.

The rules for happens-before:

  • Program order rule. Each action in a thread happens-before every ac- tion in that thread that comes later in the program order.

  • Monitor lock rule. An unlock on a monitor lock happens-before every subsequent lock on that same monitor lock.3

  • Volatile variable rule. A write to a volatile field happens-before every subsequent read of that same field.4

  • Thread start rule. A call to Thread.start on a thread happens-before every action in the started thread.

  • Thread termination rule. Any action in a thread happens-before any other thread detects that thread has terminated, either by success- fully return from Thread.join or by Thread.isAlive returning false.

  • Interruption rule. A thread calling interrupt on another thread happens-before the interrupted thread detects the interrupt (either by having InterruptedException thrown, or invoking isInterrupted or interrupted).

  • Finalizer rule. The end of a constructor for an object happens-before the start of the finalizer for that object.

  • Transitivity. If A happens-before B, and B happens-before C, then A happens-before C.

Double Checked Locking

  • is considered anti-pattern because
    • Thread A first initializes and gets Resource
    • Thread B may see resource partially initialized (case when it returns before initialization)
    • volatile resource solves it

Initialization safety

  • all fields see values set to final fields by constructor (Immutability)
    • All writes to final fields made by the constructor, as well as to any variables reachable through those fields, become “frozen” when the constructor completes, and any thread that obtains a reference to that object is guaranteed to see a value that is at least as up to date as the frozen value.

Quotes

  • "NASA devotes more of its engineering resources to testing (it is estimated they employ 20 testers for each developer) than any commercial entity could afford to—and the code produced is still not free of defects. In complex programs, no amount of testing can find all coding errors."

Python POC

cd python_pocs
python -m venv venv
source venv/bin/activate
# deactivate

References

About

kotlin-starter

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published