





















































In this article by Javier Fernández González, author of the book Java 9 Concurrency Cookbook - Second Edition we will cover how to run tasks asynchronously.
When you execute ForkJoinTask in ForkJoinPool, you can do it in a synchronous or asynchronous way. When you do it in a synchronous way, the method that sends the task to the pool doesn't return until the task sent finishes its execution. When you do it in an asynchronous way, the method that sends the task to the executor returns immediately, so the task can continue with its execution.
(For more resources related to this topic, see here.)
You should be aware of a big difference between the two methods. When you use the synchronized methods, the task that calls one of these methods (for example, the invokeAll() method) is suspended until the tasks it sent to the pool finish their execution. This allows the ForkJoinPool class to use the work-stealing algorithm to assign a new task to the worker thread that executed the sleeping task. On the contrary, when you use the asynchronous methods (for example, the fork() method), the task continues with its execution, so the ForkJoinPool class can't use the work-stealing algorithm to increase the performance of the application. In this case, only when you call the join() or get() methods to wait for the finalization of a task, the ForkJoinPool class can use that algorithm.
In addition to RecursiveAction and RecursiveTask classes, Java 8 introduced a new ForkJoinTask with the CountedCompleter class. With this kind of tasks you can include a completion action that will be executed when is launched and there is no child pending tasks. This mechanism is based in a method included in the class (the onCompletion() method) and a counter of pending tasks.
This counter is initialized to zero by default and you can increment it when you need in an atomic way. Normally, you will increment this counter one by one when you launch a child task. Finally, when a task has finished is execution, you can try to complete the execution of the task and consequently, executes the onCompletion() method. If the pending count is bigger than zero, it is decremented by one. If it's zero, the onCompletion() method is executed and then the parent task is tried to complete.
In this article, you will learn how to use the asynchronous methods provided by the ForkJoinPool and CountedCompleter classes for the management of tasks. You are going to implement a program that will search for files with a determined extension inside a folder and its subfolders. The CountedCompleter class you're going to implement will process the content of a folder. For each subfolder inside that folder, it will send a new task to the ForkJoinPool class in an asynchronous way. For each file inside that folder, the task will check the extension of the file and add it to the result list if it proceeds. When a task is completed, it will insert the result lists of all its child tasks in its result task.
Follow these steps to implement the example:
public class FolderProcessor extends CountedCompleter<List<String>> {
private static final long serialVersionUID = -1826436670135695513L;
private String path;
private String extension;
private List<FolderProcessor> tasks;
private List<String> resultList;
protected FolderProcessor (CountedCompleter<?> completer, String path, String extension) {
super(completer);
this.path=path;
this.extension=extension;
}
public FolderProcessor (String path, String extension) {
this.path=path;
this.extension=extension;
}
@Override
protected void compute() {
resultList=new ArrayList<>();
tasks=new ArrayList<>();
File file=new File(path);
File content[] = file.listFiles();
if (content != null) {
for (int i = 0; i < content.length; i++) {
if (content[i].isDirectory()) {
FolderProcessor task=new FolderProcessor(this, content[i].getAbsolutePath(), extension);
task.fork();
addToPendingCount(1);
tasks.add(task);
} else {
if (checkFile(content[i].getName())){
list.add(content[i].getAbsolutePath());
}
}
}
if (tasks.size()>50) {
System.out.printf("%s: %d tasks ran.n",file.getAbsolutePath(),tasks.size());
}
}
tryComplete();
}
@Override
public void onCompletion(CountedCompleter<?> completer) {
for (FolderProcessor childTask : tasks) {
resultList.addAll(childTask.getResultList());
}
}
private boolean checkFile(String name) {
return name.endsWith(extension);
}
public class Main {
public static void main(String[] args) {
ForkJoinPool pool=new ForkJoinPool();
FolderProcessor system=new FolderProcessor("C:\Windows", "log");
FolderProcessor apps=new
FolderProcessor("C:\Program Files","log");
FolderProcessor documents=new FolderProcessor("C:\Documents And Settings","log");
pool.execute(system);
pool.execute(apps);
pool.execute(documents);
do {
System.out.printf("******************************************n");
System.out.printf("Main: Parallelism: %dn",pool.getParallelism());
System.out.printf("Main: Active Threads: %dn",pool.getActiveThreadCount());
System.out.printf("Main: Task Count: %dn",pool.getQueuedTaskCount());
System.out.printf("Main: Steal Count: %dn",pool.getStealCount());
System.out.printf("******************************************n");
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while ((!system.isDone())||(!apps.isDone())||(!documents.isDone()));
pool.shutdown();
List<String> results;
results=system.join();
System.out.printf("System: %d files found.n",results.size());
results=apps.join();
System.out.printf("Apps: %d files found.n",results.size());
results=documents.join();
System.out.printf("Documents: %d files found.n",results.size());
The following screenshot shows part of an execution of this example:
The key of this example is in the FolderProcessor class. Each task processes the content of a folder. As you know, this content has the following two kinds of elements:
If the task finds a folder, it creates another FolderProcessor object to process that folder and sends it to the pool using the fork() method. This method sends the task to the pool that will execute it if it has a free worker-thread or it can create a new one. The method returns immediately, so the task can continue processing the content of the folder. For every file, a task compares its extension with the one it's looking for and, if they are equal, adds the name of the file to the list of results.
Once the task has processed all the content of the assigned folder, we try to complete the current task. As we explained in the introduction of this article, when we try to complete a task, the code of the CountedCompleter looks for the value of the pending task counter. If this value is bigger than 0, it decrease of that counter. On the contrary, if the value is 0, the task executes the onCompletion() method and then try to completes its parent task. In our case, when a task is processing a folder and it finds a subfolder, it creates a new child task, launch that task using the fork() method and increment the counter of pending tasks. So, when a task has processed all its content, the counter of pending tasks of the task will be equal to the number of child tasks we have launched. When we call the tryComplete() method, if the folder of the current task has subfolders, this call will decrease the number of pending tasks. Only when all its child tasks have been completed, its onCompletion() method is executed. If the folder of the current task hasn't got any subfolders, the counter of pending tasks will be zero and the onComplete() method will be called immediately and then it will try to complete its parent task. By this way, we create a tree of tasks from top to bottom that are completed from bottom to top. In the onComplete() method, we process all the result lists of the child tasks and add their elements in the result list of the current task.
The ForkJoinPool class also allows the execution of tasks in an asynchronous way. You have used the execute() method to send the three initial tasks to the pool. In the Main class, you also finished the pool using the shutdown() method and wrote information about the status and the evolution of the tasks that are running in it. The ForkJoinPool class includes more methods that can be useful for this purpose.
In this example we have used the addToPendingCount() method to increment the counter of pending tasks, but we have other methods we can use to change the value of this counter.
The CountedCompleter class also includes other methods to manage the completion of the tasks. These are the most significant ones:
In this example, you have used the join() method to wait for the finalization of tasks and get their results. You can also use one of the two versions of the get() method with this purpose:
In this article we learned how to use the asynchronous methods provided by the ForkJoinPool and CountedCompleter classes for the management of tasks.
Further resources on this subject: