The best way to run asynchronous threads in Java had been to use the Executor Service framework until Java 8.
Executor Service returns a Future object which can be used to retrieve the result of an asynchronous thread.
But it comes with few drawbacks.
Among the many features introduced in Java 8 , one of the major features was the CompletableFuture class. This class overcomes the drawbacks of the Future class. This post explains how.
Before that,
What is a CompletableFuture class?
A CompletableFuture class is also a Future class. It implements the Future interface. So all the functionalities provided by Future are also available in CompletableFuture class. In addition a Completable Future as the name suggests can be completed whereas a Future cannot . It means if you think a thread is taking too much time , you can complete it midway with a result you assign and return , whereas a Future object cannot do that. It can only cancel a thread if it isn’t returning a response in time.
Below are the major differences between Future and CompletableFuture illustrated with examples:
A Future Object is Blocking , A Completable Future is non blocking.
Let’s execute two methods asynchronously . They both just return a string and are manually made to sleep for a second before returning the message:
public class CompleteableFutureDemo {
public String sampleThread1() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello Concurrency. I am the first thread";
}
public String sampleThread2() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello Concurrency. I am the second thread";
}
}
Let’s execute the first thread using Executor Service framework:
CompleteableFutureDemo d = new CompleteableFutureDemo();
ExecutorService executor = Executors.newCachedThreadPool();
Future<String> future = executor.submit(() -> d.sampleThread1());
String message = future.get();
System.out.println(message);
I have created an executor instance, submitted the method sampleThread1() to the executor thus created. It returns a future object. I am then calling the method get() on future object which returns the result. Finally I am printing the message returned.
Let’s see how to achieve this using the CompletableFuture class:
CompletableFuture.supplyAsync(() -> d.sampleThread1()).thenAccept(m ->
System.out.println(m));
Much smaller , just a single line of code!
In the above code the method supplyAsync() is used to create a CompletableFuture instance which returns a result.
It takes a Supplier functional interface as a parameter.
I have passed the method sampleThread1() through a Supplier instance created by using lamda expression.
Finally the method thenAccept() is called on the response of the method supplyAsync() which consumes the response and prints it to the console.
thenAccept() is non blocking because it is a callback method. Whenever the result is available this callback method is invoked and it prints the result.
On the contrary , the get() method on Future object is blocking. It is not a callback method and hence will suspend the current thread of execution.
This can be understood by calling the two methods sampleThread1() and sampleThread2() in succession:
CompleteableFutureDemo d = new CompleteableFutureDemo();
// Using Executor service
ExecutorService executor = Executors.newCachedThreadPool();
Future<String> future = executor.submit(() -> d.sampleThread1());
String message = future.get();
System.out.println(message);
Future<String> future2 = executor.submit(() -> d.sampleThread2());
String message2 = future2.get();
System.out.println(message2);
When I executed the above code , the message “Hello Concurrency. I am the first thread” got printed first , then after a second the message “Hello Concurrency. I am the second thread” got printed.
Here is the code execution using CompletableFuture:
CompletableFuture.supplyAsync(() -> d.sampleThread1()).thenAccept(m ->
System.out.println(m));
CompletableFuture.supplyAsync(() -> d.sampleThread2()).thenAccept(m ->
System.out.println(m));
In this case both the messages got printed at the same time , since they were printed using callback method (thenAccept())
There is a work around for resolving this problem with Future object though. Instead of calling the get() method of future object immediately after submitting it, you can call it after submitting the second thread (sampleThread2()). You can call the get() method on the two future objects together like this:
CompleteableFutureDemo d = new CompleteableFutureDemo();
// Using Executor service
ExecutorService executor = Executors.newCachedThreadPool();
Future<String> future = executor.submit(() -> d.sampleThread1());
Future<String> future2 = executor.submit(() -> d.sampleThread2());
String message = future.get();
System.out.println(message);
String message2 = future2.get();
System.out.println(message2);
In this way , though the get() method on the first future blocks the current thread of execution , by the time it completes , the second thread would also have completed and hence calling get() on the second future immediately returns the result. This happened because both the threads took one second to complete.
Even if the threads took different time to complete , the total execution time will be the time taken by the longest running thread.
So from user perspective the total time taken for both the threads will be the same whether you use executor service or CompletableFuture.
BUT,
if there is any line of code after the get() method of the second future it will be blocked until both the futures complete. It won’t be blocked if you use CompletableFuture.
Threads cannot be combined using Future objects, they can be combined using Completable Future
Say in the above example , you want to execute the method sampleThread2() after the method sampleThread1() returns. Let’s say the second method takes the message returned by the first method and returns it along with its own message.
How to do this using Executor framework?
You can’t combine two threads in executor framework.
As a workaround , you can create a facade method which invokes the two methods in the desired sequence like this :
public String sampleThread1() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello Concurrency. I am the first thread";
}
public String sampleThread2(String message) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return message+"\n"+"Hello Concurrency. I am the second thread";
}
String facade(){
String message1 = sampleThread1();
return sampleThread2(message1);
}
And then submit the facade method to the executor:
Future<String> combinedFuture = executor.submit(() -> d.facade());
String result = combinedFuture.get();
System.out.println(result);
This again is a blocking code , when you execute combinedFuture.get() the current thread is blocked until the two methods inside the facade complete.
CompletableFuture provides a better mechanism to run threads in a pipleline. You can use the method thenApply() to achieve this.
CompletableFuture.supplyAsync(() -> d.sampleThread1())
.thenApply(message -> d.sampleThread2(message))
.thenAccept(finalMsg -> System.out.println(finalMsg));
As you see , you can trigger the first thread sampleThread1() using supplyAsync() method .
It returns a type of CompletableFuture<String> .
On this response you can invoke the method thenApply() and invoke the second thread sampleThread2() by passing the message returned by the first thread.
Finally you can consume the final output using thenAccept() method which accepts a Consumer functional interface.
The above code snippet is non blocking and whatever code is written after that immediately starts executing.
A thread cannot be completed midway using Future object , they can be completed using Completable Future object
Consider the sampleThread1 method again:
public String sampleThread1() {
System.out.println("Inside sample thread1");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Hello Concurrency. I am the first thread";
}
Using executor service , you cannot return from this thread midway with a custom message. The thread can only be cancelled.
But using Completable Future , for specific conditions (say if you haven’t got the response for a long time ) you can assign your own value and return.
Consider this code:
CompletableFuture<String> compFuture = CompletableFuture.supplyAsync(() -> d.sampleThread1());
String message = compFuture.get();
System.out.println(message);
The get() method returns the message “Hello Concurrency. I am the first thread” message.
But if you want to return another message you can do so using complete() method :
CompletableFuture<String> compFuture = CompletableFuture.supplyAsync(() -> d.sampleThread1());
// due to some condition , completing the thread midway:
compFuture.complete("Hello Concurrency! This is Completable Future");
String message = compFuture.get();
System.out.println(message);
The above code snippet returns the message “Hello Concurrency! This is Completable Future” when get() method is called instead of the actual message returned by the string.
This is not possible with Future object.
Further to these differences, you can trigger multiple threads in parallel , wait for all of them to complete and process their outputs together using allOf() method provided by CompletableFuture. ExecutorService doesn’t provide a counterpart.
Also CompletableFuture provides exception handling whereas executor service does not.
CompletableFuture is quite big a feature in Java 8. I have only touch based on the important features here , enough I believe to get a peek of what it is capable of.
Note : If you run the above code in a stand-alone java class the main thread may exit before the asynchronous threads created by Completable future returns . To see the output keep the main thread running ( I kept it running by executing the Executor service (executor.submit()) and not shutting it down )
Leave a Reply