Java Callable and Future学习

在本教程中,我们将学习Callable and Future。

Callable

在先前的教程中,我们使用一个Runnable对象来定义在线程内执行的任务。尽管使用定义任务Runnable非常方便,但是由于任务无法返回结果而受到限制。

如果要从任务中返回结果怎么办?

 

好吧,Java提供了一个Callable接口来定义返回结果的任务。ACallable类似于,Runnable除了它可以返回结果并引发检查的异常。

Callable接口具有单个方法call(),该方法旨在包含线程执行的代码。这是一个简单的Callable的示例-

Callable<String> callable = new Callable<String>() {
    @Override
    public String call() throws Exception {
        // Perform some computation
        Thread.sleep(2000);
        return "Return some result";
    }
};

请注意,使用时Callable,您不需要Thread.sleep()被try / catch块包围,因为与Runnable不同,Callable可以引发已检查的异常。

您也可以像这样将lambda表达式与Callable一起使用-

Callable<String> callable = () -> {
    // Perform some computation
    Thread.sleep(2000);
    return "Return some result";
};
 

使用ExecutorService执行可调用任务,并使用Future获得结果

就像一样Runnable,您可以将提交Callable给执行者服务以执行。但是Callable的结果呢?您如何访问它?

submit()执行程序服务的方法将任务提交给线程执行。但是,它不知道所提交任务的结果何时可用。因此,它返回一种称为a的特殊类型的值,该值Future可用于在任务可用时获取任务的结果。

的概念Future类似于Java等其他语言中的Promise。它表示将在以后的较晚时间点完成的计算结果。

以下是Future和Callable的简单示例-

import java.util.concurrent.*;

public class FutureAndCallableExample {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ExecutorService executorService = Executors.newSingleThreadExecutor();

		Callable<String> callable = () -> {
			// Perform some computation
			System.out.println("Entered Callable");
			Thread.sleep(2000);
			return "Hello from Callable";
		};

		System.out.println("Submitting Callable");
		Future<String> future = executorService.submit(callable);

		// This line executes immediately
		System.out.println("Do something else while callable is getting executed");

		System.out.println("Retrieve the result of the future");
		// Future.get() blocks until the result is available
		String result = future.get();
		System.out.println(result);

		executorService.shutdown();
	}

}
 
# Output
Submitting Callable
Do something else while callable is getting executed
Retrieve the result of the future
Entered Callable
Hello from Callable

ExecutorService.submit()方法立即返回并给您Future。一旦获得了未来,就可以在执行提交的任务时并行执行其他任务,然后使用future.get()方法来检索未来的结果。

请注意,该get()方法将阻塞直到任务完成。该FutureAPI还提供了一个isDone()方法来检查任务是否完成或不-

import java.util.concurrent.*;

public class FutureIsDoneExample {
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		ExecutorService executorService = Executors.newSingleThreadExecutor();

		Future<String> future = executorService.submit(() -> {
			Thread.sleep(2000);
			return "Hello from Callable";
		});

		while (!future.isDone()) {
			System.out.println("Task is still not done...");
			Thread.sleep(200);
		}

		System.out.println("Task completed! Retrieving the result");
		String result = future.get();
		System.out.println(result);

		executorService.shutdown();
	}
}
 
# Output
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task completed! Retrieving the result
Hello from Callable

取消未来

您可以取消将来的使用Future.cancel()方法。它尝试取消任务的执行,如果成功取消,则返回true,否则返回false。

cancel()方法接受布尔参数- mayInterruptIfRunning。如果传递true此参数的值,则当前正在执行任务的线程将被中断,否则将允许正在进行的任务完成。

您可以使用isCancelled()方法来检查任务是否被取消。同样,取消任务后,isDone()将始终为真。

import java.util.concurrent.*;

public class FutureCancelExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        long startTime = System.nanoTime();
        Future<String> future = executorService.submit(() -> {
            Thread.sleep(2000);
            return "Hello from Callable";
        });

        while(!future.isDone()) {
            System.out.println("Task is still not done...");
            Thread.sleep(200);
            double elapsedTimeInSec = (System.nanoTime() - startTime)/1000000000.0;

            if(elapsedTimeInSec > 1) {
                future.cancel(true);
            }
        }

        System.out.println("Task completed! Retrieving the result");
        String result = future.get();
        System.out.println(result);

        executorService.shutdown();
    }
}
# Output
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task is still not done...
Task completed! Retrieving the result
Exception in thread "main" java.util.concurrent.CancellationException
        at java.util.concurrent.FutureTask.report(FutureTask.java:121)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at FutureCancelExample.main(FutureCancelExample.java:34)

如果运行上面的程序,它将抛出异常,因为如果取消任务,future.get()方法将抛出异常CancellationException。我们可以通过在获取结果之前检查是否取消了期货来处理这一事实-

if(!future.isCancelled()) {
    System.out.println("Task completed! Retrieving the result");
    String result = future.get();
    System.out.println(result);
} else {
    System.out.println("Task was cancelled");
}
 

添加超时

future.get()方法将阻止并等待任务完成。如果您在可调用任务中从远程服务调用API,并且远程服务已关闭,则future.get()它将永远阻塞,这将使应用程序无响应。

为了避免这种情况,您可以在get()方法中添加超时-

future.get(1, TimeUnit.SECONDS);

如果任务没有在指定的时间内完成,则该future.get()方法将抛出TimeoutException

invokeAll

提交多个任务,等待所有任务完成。

您可以通过将一组Callables传递给invokeAll()方法来执行多个任务。在invokeAll()返回的Future列表。future.get()在所有期货都完成之前,对的任何调用都将阻塞。

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class InvokeAllExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        Callable<String> task1 = () -> {
            Thread.sleep(2000);
            return "Result of Task1";
        };

        Callable<String> task2 = () -> {
            Thread.sleep(1000);
            return "Result of Task2";
        };

        Callable<String> task3 = () -> {
            Thread.sleep(5000);
            return "Result of Task3";
        };

        List<Callable<String>> taskList = Arrays.asList(task1, task2, task3);

        List<Future<String>> futures = executorService.invokeAll(taskList);

        for(Future<String> future: futures) {
            // The result is printed only after all the futures are complete. (i.e. after 5 seconds)
            System.out.println(future.get());
        }

        executorService.shutdown();
    }
}
 
# Output
Result of Task1
Result of Task2
Result of Task3

在上述程序中,对future.get()语句的首次调用会阻塞,直到所有期货均完成。即结果将在5秒钟后打印出来。

invokeAny

提交多个任务,等待其中任何一个完成

invokeAny()方法接受的集合Callables并返回最快的Callable的结果。请注意,它不会返回Future。

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

public class InvokeAnyExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        Callable<String> task1 = () -> {
            Thread.sleep(2000);
            return "Result of Task1";
        };

        Callable<String> task2 = () -> {
            Thread.sleep(1000);
            return "Result of Task2";
        };

        Callable<String> task3 = () -> {
            Thread.sleep(5000);
            return "Result of Task3";
        };

        // Returns the result of the fastest callable. (task2 in this case)
        String result = executorService.invokeAny(Arrays.asList(task1, task2, task3));

        System.out.println(result);

        executorService.shutdown();
    }
}
# Output
Result of Task2

结论

您可以在我的github存储库中找到本教程中使用的所有代码段。我鼓励您分叉存储库并自己练习程序。

上一篇:JAVA线程的那些事?


下一篇:JAVA多线程的三种实现方式