Java 21 – Structured Concurrency

Let’s say you want to perform a group of tasks in Java.

And all these tasks are part of a single unit of work.

And all these tasks can be executed independently.

How can this be done in Java?

You can execute each task sequentially.

So you execute task 1 , then task 2 and so on.

Each task is blocking code , so the total time taken to execute all the tasks is the sum of time taken for each task.

How can we optimize this?

The problem statement says these tasks can be executed independently.

So you can execute each task concurrently.

And once all the tasks are completed , you can take their outputs and do the final processing.

How to implement this?

Threads are the basic unit of concurrency in Java.

You can execute each task as part of a single thread.

You can use the Executor framework provided by Java.

You can create an executor instance using Executor.newCachedThreadPool().

And then submit each of your task wrapped with Callable.

There is a limitation with this approach.

Java doesn’t consider the separate tasks as part of a single unit of work.

So if one of the tasks fail , the other tasks keep executing , though ideally we would want to cancel the rest of the tasks.

Let’s take a sample.

Let’s create two tasks .

Task1 returns an operand and task 2 returns another operand.

You just add these two operands.

Let’s introduce some lag in each method to simulate a thread (using Thread.sleep() method)

We will do this operation using the different patterns available in Java.

No Concurrency

Here is the straightforward code without concurrency:

public static  void noconcurrency{
   try {
            int first = getFirst();
            int second = getSecond();
            System.out.println("The sum is " + (first + second));
        } catch (Exception e) {
            e.printStackTrace();
        }
}
 private static int getFirst() throws Exception {
        Thread.sleep(Duration.ofSeconds(5));
        System.out.println("Returning 100");
        return 100;
    }
    private static int getSecond() throws Exception {
        Thread.sleep(Duration.ofSeconds(5));
        System.out.println("Returning 400");
        return 400;
    }

The above code is sequential.

Since we are sleeping 5 seconds while getting each operand the total time taken for calculating the sum in the above case is 10 seconds !

Now we can optimize this.

getFirst() and getSecond() are independent operations.

We don’t have to wait for each other.

So let’s convert them to concurrent tasks.

Unstructured Concurrency

Until Java 21 , the best way to implement concurrency in Java was using Executor framework.

Here you convert the two tasks to Callables and then submit them to the executor

Here is a sample code:

private static void unstructuredConcurrency() {
        try (var executor = Executors.newCachedThreadPool()) {
            Future<Integer> firstTask = executor.submit(() -> getFirst());
            Future<Integer> secondTask = executor.submit(() -> getSecond());
            int first = firstTask.get();
            int second = secondTask.get();
            System.out.println("The sum is " + first + second);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static int getFirst() throws Exception {
        Thread.sleep(Duration.ofSeconds(5));
        System.out.println("Returning 100");
        return 100;
    }
    private static int getSecond() throws Exception {
        Thread.sleep(Duration.ofSeconds(5));
        System.out.println("Returning 400");
        return 400;
    }

Each task is converted to a Callable statement using lambda expression.

They are then submitted to the executor in parallel .

The executor returns a Future object for each Callable .

You then retrieve the result using Future.get() method on each returned future.

You finally add the result.

Since the methods getFirst() and getSecond() are invoked in parallel , the above code took 5 seconds to run in my machine.

There is a limitation in the above code.

If one of the methods fail , the other continues to execute.

Here is an example:

I replaced the getSecond() method with getSecondException() method which throws an exception.

getFirst() method is the same.

It sleeps for 5 seconds and then prints “Returning 100” and then returns 100.

   private static int getFirst() throws Exception {
        Thread.sleep(Duration.ofSeconds(5));
        System.out.println("Returning 100");
        return 100;
    }
 private static int getSecondException() throws Exception {
        int a = 5 / 0;
        Thread.sleep(Duration.ofSeconds(5));
        System.out.println("Returning 400");
        return 400;
    }
    private static void unstructuredConcurrencyException() {
        try (var executor = Executors.newCachedThreadPool()) {
         
            Future<Integer> firstTask = executor.submit(() -> getFirst());
            Future<Integer> secondTask = executor.submit(() -> getSecondException());
            int first = firstTask.get();
            int second = secondTask.get();
            System.out.println("The sum is " + first + second);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Here is the output of the above code:

Notice that getFirst() method is getting executed even though getSecondException() method throws error. (The message “Returning 100” gets printed after 5 seconds though the error is thrown before that)

Ignore the Self-Supression not permitted message. Since StructuredTaskScope throws an exception it overrides the exception thrown by code and so the warning is displayed.

Now let’s replace the above code with Structured Concurrency from Java 21.

Structured Concurrency

Structured Concurrency treats all the independent tasks of a single unit of work as part of the work.

So if any of the tasks fail it cancels the remaining tasks.

So in the above case getFirst() method will get cancelled out before printing the message.

Here is the code for structured concurrency:

 private static void structuredConcurrency() {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Supplier<Integer> firstTask = scope.fork(() -> getFirst());
            Supplier<Integer> secondTask = scope.fork(() -> getSecond());
            scope.join().throwIfFailed();
            int first = firstTask.get();
            int second = secondTask.get();
            int sum = first + second;
            System.out.println("The sum is :" + sum);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Notice the class “StructuredTaskScope”.

This is the backbone of Structured Concurrency.

You first create a new scope by creating an instance of this class.

In the above example we create the instance using the factory method StructuredTaskScope.ShutdownOnFailure().

This makes sure all the tasks are shut down immediately if any of the tasks or even if the parent task (in the above case the parent task is the entire block of code within try catch statement) fail .

Instead of submitting to an executor service , in the above case you create a callable for the task and submit it to scope.fork() method.

You fork a new task for the given scope.

This method returns a Supplier instead of a Future object.

When you do this the tasks start getting executed.

And then you do scope.join() which joins the tasks within that scope and wait tills all the tasks are completed.

scope.join.throwIfFailed() method throws exception if any of the tasks fail and shuts down the remaining tasks.

Finally you get the output of each task using Supplier.get() method.

This method is non blocking (You immediately get the output)

Both the tasks would have already got completed while calling scope.join() method(which is blocking).

Java doesn’t return a Future object here like it does for Unstructured Concurrency as its purpose is more suitable for unstructured concurrency.(Future.get() method is a blocking call etc).

Now let’s see how error cases are handled in Structured Concurrency.

Structured Concurrency – Error scenario

Now let’s use Structured Concurrency for the error scenario we discussed before.

Let us replace getSecond() method with getSecondException() which throws an exception immediately.

Here is the code:

 private static void structuredConcurrencyException() {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Supplier<Integer> firstTask = scope.fork(() -> getFirst());
            Supplier<Integer> secondTask = scope.fork(() -> getSecondException());
            scope.join().throwIfFailed();
            int first = firstTask.get();
            int second = secondTask.get();
            int sum = first + second;
            System.out.println("The sum is :" + sum);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Here is the output:

Notice that the message from getFirst() is not printed in this case as it gets cancelled immediately after getSecondException() throws an exception.

This saves time and prevents unnecessary code execution.

You may not always want to wait till all tasks complete.

In some cases you may want to shut down as soon as any one of the tasks complete.

Let’s see it in the next section.

Structured Concurrency – Return on success of a single task

Let’s say you have a collection of services.

And you want the result from any one of them.

As soon as you get one you can quit the remaining calls.

You can do that using StructuredTaskScope.ShutdownOnSuccess() method.

Here is a sample code:

 private static void structuredConcurrencySuccess() {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<>()) {
            scope.fork(() -> getFirst());
            scope.fork(() -> getSecondSlower());
            int result = (int) scope.join().result();
            System.out.println("The result is :" + result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static int getSecondSlower() throws Exception {
        Thread.sleep(Duration.ofSeconds(6));
        System.out.println("Returning 400");
        return 400;
    }

As you notice in the above code , instead of getSecond() method we are calling getSecondSlower() method which is one second slower.

And we are not using Supplier.get() method to get the result.

The scope.join() method directly returns the result in the case.

As expected , getFirst() completes first and immediately the result is returned:

You can also specify a deadline within which all the tasks should get completed.

Let’s see it in the next section.

Structured Concurrency – With Deadline

In structured concurrency you can specify a deadline within which all the tasks should get completed.

Here is an example:

   private static void structuredConcurrencyDeadline() {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Supplier<Integer> firstTask = scope.fork(() -> getFirst());
            Supplier<Integer> secondTask = scope.fork(() -> getSecondSlower());
            scope.joinUntil(Instant.now().plusSeconds(5)).throwIfFailed();
            int first = firstTask.get();
            int second = secondTask.get();
            int sum = first + second;
            System.out.println("The sum is :" + sum);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

Notice that there is only one change.

Instead of scope.join() you use scope.joinUntil()

And you specify the deadline using Instant object.

If the tasks don’t get completed within that time an exception is thrown.

The getSecondSlower() method takes more than 5 seconds in the above example, so an exception is thrown.

A timeout exception is thrown as expected!

We saw two patterns in Structured Concurrency.

One cancels all the tasks if any of the tasks fail (ShutdownOnFailure)

Another returns immediately if any of the task succeed and cancels all other tasks (ShutdownOnSuccess)

You can write your custom scope by extending StructuredTaskScope.

You may ignore partial failures if you want.

Here is the consolidated code for all the use cases discussed above:

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.StructuredTaskScope;
import java.util.function.Supplier;
public class App {
    public static void main(String a[]) {
        noconcurrency();
        unstructuredConcurrency();
        unstructuredConcurrencyException();
        structuredConcurrency();
        structuredConcurrencyException();
        structuredConcurrencySuccess();
        structuredConcurrencyDeadline();
    }
    private static void noconcurrency() {
        try {
            long start = System.currentTimeMillis();
            int first = getFirst();
            int second = getSecond();
            long end = System.currentTimeMillis();
            System.out.println("The sum is " + (first + second));
            System.out.println("Time taken " + (end - start) / 1000);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static void structuredConcurrency() {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Supplier<Integer> firstTask = scope.fork(() -> getFirst());
            Supplier<Integer> secondTask = scope.fork(() -> getSecond());
            scope.join().throwIfFailed();
            int first = firstTask.get();
            int second = secondTask.get();
            int sum = first + second;
            System.out.println("The sum is :" + sum);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static void structuredConcurrencyException() {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Supplier<Integer> firstTask = scope.fork(() -> getFirst());
            Supplier<Integer> secondTask = scope.fork(() -> getSecondException());
            scope.join().throwIfFailed();
            int first = firstTask.get();
            int second = secondTask.get();
            int sum = first + second;
            System.out.println("The sum is :" + sum);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static void structuredConcurrencyDeadline() {
        try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
            Supplier<Integer> firstTask = scope.fork(() -> getFirst());
            Supplier<Integer> secondTask = scope.fork(() -> getSecondSlower());
            scope.joinUntil(Instant.now().plusSeconds(5)).throwIfFailed();
            int first = firstTask.get();
            int second = secondTask.get();
            int sum = first + second;
            System.out.println("The sum is :" + sum);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static void structuredConcurrencySuccess() {
        try (var scope = new StructuredTaskScope.ShutdownOnSuccess<>()) {
            scope.fork(() -> getFirst());
            scope.fork(() -> getSecondSlower());
            int result = (int) scope.join().result();
            System.out.println("The result is :" + result);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static void unstructuredConcurrency() {
        try (var executor = Executors.newCachedThreadPool()) {
            long start = System.currentTimeMillis();
            Future<Integer> firstTask = executor.submit(() -> getFirst());
            Future<Integer> secondTask = executor.submit(() -> getSecond());
            int first = firstTask.get();
            int second = secondTask.get();
            long end = System.currentTimeMillis();
            System.out.println("Time taken " + (end - start) / 1000);
            System.out.println("The sum is " + first + second);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static void unstructuredConcurrencyException() {
        try (var executor = Executors.newCachedThreadPool()) {
            long start = System.currentTimeMillis();
            Future<Integer> firstTask = executor.submit(() -> getFirst());
            Future<Integer> secondTask = executor.submit(() -> getSecondException());
            int first = firstTask.get();
            int second = secondTask.get();
            long end = System.currentTimeMillis();
            System.out.println("Time taken " + (end - start) / 1000);
            System.out.println("The sum is " + first + second);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static int getFirst() throws Exception {
        Thread.sleep(Duration.ofSeconds(5));
        System.out.println("Returning 100");
        return 100;
    }
    private static int getSecond() throws Exception {
        Thread.sleep(Duration.ofSeconds(5));
        System.out.println("Returning 400");
        return 400;
    }
    private static int getSecondException() throws Exception {
        int a = 5 / 0;
        Thread.sleep(Duration.ofSeconds(5));
        System.out.println("Returning 400");
        return 400;
    }
    private static int getSecondSlower() throws Exception {
        Thread.sleep(Duration.ofSeconds(6));
        System.out.println("Returning 400");
        return 400;
    }
}

That’s it!

Note : Structured Concurrency is a preview feature in Java 21, so we need to make sure preview feature flag is enabled when you run the code .

Reference:

https://openjdk.org/jeps/453


Posted

in

, ,

by

Comments

Leave a Reply

Discover more from The Full Stack Developer

Subscribe now to keep reading and get access to the full archive.

Continue reading