https://www.javacodegeeks.com/2014/07/java-ee-concurrency-api-tutorial.html
This is a sample chapter taken from the Practical Java EE 7 development on WildFlybook edited by Francesco Marchioni.
This chapter discusses about the new Java EE Concurrency API (JSR 236) which outlines a standard way for executing tasks in parallel on a Java EE Container using a set of Managed resources. In order to describe how to use this API in your applications, we will follow this roadmap:
- A short introduction to the Concurrency Utilities
- How to leverage asynchronous tasks using the ManagedExecutorService
- How to schedule tasks at specific times using the ManagedScheduledExecutorService
- How to create dynamic proxy objects which add contextual information available in Java EE environment
- How to use the ManagedThreadFactory to create managed threads to be used by your applications
Overview of Concurrency Utilities
Prior to Java EE 7 executing concurrent tasks within a Java EE Container was widely acknowledged as a dangerous practice and sometimes even prohibited by the container:
“The enterprise bean must not attempt to manage threads. The enterprise bean must not attempt to start, stop, suspend, or resume a thread, or to change a thread’s priority or name. The enterprise bean must not attempt to manage thread groups”
Actually, by creating your own un-managed Threads in a Java EE container, using the J2SE API, would not guarantee that the context of the container is propagated to the thread executing the task.
The only available pattern was either using Asynchronous EJB or Message Driven Bean, in order to execute a task in an asynchronous way; most often this was enough for simple fire and forget patterns, yet the control of Threads still lied in the hands of the Container.
With the Java EE Concurrency API (JSR 236) you can use extensions to the java.util.concurrent API as Managed Resources, that is, managed by the Container. The only difference from the standard J2SE programming is that you will retrieve your Managed resources from the JNDI tree of the Container. Yet you will still use your Runnable interfaces or classes that are part of the java.util.concurrent
package such as Future
or ScheduledFuture
.
In the next section, we will start from the simplest example, which is executing an asynchronous task using theManagedExecutorService
.
Using the ManagedExecutorService to submit tasks
In order to create our first asynchronous execution we will show how to use the ManagedExecutorService
, which extends the Java SE ExecutorService to provide methods for submitting tasks for execution in a Java EE environment. By using this managed service, the context of the container is propagated to the thread executing the task: The ManagedExecutorService is included as part of the EE configuration of the application server:
01 |
< subsystem xmlns = "urn:jboss:domain:ee:2.0" >
|
09 |
< managed-executor-services >
|
11 |
< managed-executor-service name = "default"
|
13 |
jndi-name = "java:jboss/ee/concurrency/executor/default"
|
15 |
context-service = "default" hung-task-threshold = "60000"
|
17 |
core-threads = "5" max-threads = "25" keepalive-time = "5000" />
|
19 |
</ managed-executor-services >
|
In order to create our first example, we retrieve the ManagedExecutorService from the JNDI context of the container as follows:
1 |
@Resource (name = "DefaultManagedExecutorService" )
|
3 |
ManagedExecutorService executor; |
By using the ManagedExecutorService instance, you are able to submit your tasks that can implement either thejava.lang.Runnable
interface or the java.util.concurrent.Callable
interface.
Instead of having a run()
method, the Callable interface offers a call()
method, which can return any generic type.
Coding a simple Asynchronous Task
So let’s see a simple Servlet example which fires an asynchronous task using the ManagedExecutorService:
01 |
@WebServlet ( "/ExecutorServlet" )
|
03 |
public class ExecutorServlet extends HttpServlet {
|
05 |
@Resource (name = "DefaultManagedExecutorService" )
|
07 |
ManagedExecutorService executor;
|
09 |
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
11 |
PrintWriter writer = response.getWriter();
|
13 |
executor.execute( new SimpleTask());
|
15 |
writer.write( "Task SimpleTask executed! check logs" );
|
The class SimpleTask
in our example implements the Runnable
interface by providing concurrent execution.
01 |
public class SimpleTask implements Runnable {
|
06 |
System.out.println( "Thread started." );
|
Retrieving the result from the Asynchronous Task
The above Task is a good option for a down-to-earth scenario; as you might have noticed, there’s no way to intercept a return value from the Task. In addition, when using Runnable you are constrained to use unckecked exceptions (if run()threw a checked exception, who would catch it? There is no way for you to enclose that run() call in a handler, since you don’t write the code that invokes it).
If you want to overcome this limitations then you can implement a java.util.concurrent.Callable
interface instead, submit it to the ExecutorService, and waiting for result with FutureTask.isDone()
returned by the ExecutorService.submit()
.
Let’s see a new version of our Servlet, which captures the result of a Task named CallableTask
:
01 |
@WebServlet ( "/CallableExecutorServlet" )
|
03 |
public class CallableExecutorServlet extends HttpServlet {
|
05 |
@Resource (name = "DefaultManagedExecutorService" )
|
06 |
ManagedExecutorService executor;
|
08 |
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
10 |
PrintWriter writer = response.getWriter();
|
12 |
Future<Long> futureResult = executor.submit( new CallableTask( 5 ));
|
14 |
while (!futureResult.isDone()) {
|
19 |
} catch (InterruptedException e) {
|
27 |
writer.write( "Callable Task returned " +futureResult.get());
|
29 |
} catch ( Exception e) {
|
As you can see from the code, we are polling for the task completion using the isDone() method. When the task is completed we can call the FutureTask’s get() method and get the return value.
Now let’s see our CallableTask
implementation which, in our example, returns the value of the summation of a number:
01 |
public class CallableTask implements Callable<Long> {
|
05 |
public CallableTask( int id) {
|
15 |
for ( int i = 1 ; i <= id; i++) {
|
21 |
return new Long(summation);
|
In our example, all we had to do is implementing the call
method, which returns the Integer that will be eventually collected via the get
method of the Future interface.
If your Callable task has thrown an Exception, then FutureTask.get()
will raise an Exception too and the original Exception can be accessed by using Exception.getCause()
Monitoring the state of a Future Task
In the above example, we are checking the status of the Future Task using the FutureTask.isDone()
method. If you need a more accurate control over the Future Task lifecycle, then you can implementjavax.enterprise.concurrent.ManagedTaskListener
instance in order to receive lifecycle event notifications.
Here’s our enhanced Task, which implements the taskSubmitting
, taskStarting
, taskDone
and taskAborted
methods:
01 |
public class CallableListenerTask implements Callable<Long>,ManagedTaskListener {
|
05 |
public CallableListenerTask( int id) {
|
15 |
for ( int i = 1 ; i <= id; i++) {
|
21 |
return new Long(summation);
|
25 |
public void taskSubmitted(Future<?> f, ManagedExecutorService es,
|
28 |
System.out.println( "Task Submitted! " +f);
|
32 |
public void taskDone(Future<?> f, ManagedExecutorService es, Object obj,
|
35 |
System.out.println( "Task DONE! " +f);
|
39 |
public void taskStarting(Future<?> f, ManagedExecutorService es,
|
42 |
System.out.println( "Task Starting! " +f);
|
46 |
public void taskAborted(Future<?> f, ManagedExecutorService es,
|
47 |
Object obj, Throwable exc) {
|
49 |
System.out.println( "Task Aborted! " +f);
|
The lifecycle notifications are invoked in this order:
-
taskSubmitting
: on Task submission to the Executor
-
taskStarting
: before the actual Task startup
-
taskDone
: trigger on Task completion
-
taskAborted
: triggered when the user invoked futureResult.cancel()
Using Transaction in asynchronous Tasks
Within a distributed Java EE environment, it is a challenging task to guarantee proper transaction execution also for concurrent task executions. The Java EE concurrency API relies on Java Transaction API (JTA) to support transactions on the top of its components via the javax.transaction.UserTransaction
which is used to explicitly demarcate transaction boundaries.
The following code shows how a Callable Task retrieves an UserTransaction from the JNDI tree and then starts and commit a transaction with an external component (an EJB):
01 |
public class TxCallableTask implements Callable<Long> {
|
05 |
public TxCallableTask( long i) {
|
15 |
UserTransaction tx = lookupUserTransaction();
|
17 |
SimpleEJB ejb = lookupEJB();
|
23 |
value = ejb.calculate(id); // Do Transactions here
|
27 |
} catch (Exception e) {
|
31 |
try { tx.rollback(); } catch (Exception e1) { e1.printStackTrace(); }
|
39 |
// Lookup EJB and UserTransaction here .. |
The major limit of this approach is that, although context objects can begin, commit, or roll back transactions, these objects cannot enlist in parent component transactions.
Scheduling tasks with the ManagedScheduledExecutorService
The ManagedScheduledExecutorService
extends the Java SE ScheduledExecutorService
to provide methods for submitting delayed or periodic tasks for execution in a Java EE environment. As for the other managed objects, you can obtain an instance of the ExecutorService via JNDI lookup:
1 |
@Resource (name = "DefaultManagedScheduledExecutorService" )
|
2 |
ManagedScheduledExecutorService scheduledExecutor; |
Once that you have a reference to the ExecutorService, then you can invoke the schedule
method on it to submit a delayed or periodic tasks. ScheduledExecutors, just like ManagedExecutors, can be bound either to a Runnable interface or to a Callable
interface. Next section shows both approaches.
Submitting a simple ScheduledTask
In its simplest form, submitting a Scheduled Task requires setting up a schedule expression and passing it to the ManagedSchedulerExecutor Service. In this example, we are creating a delayed task which will run just once, in 10 seconds, since the schedule() method is invoked:
01 |
@WebServlet ( "/ScheduledExecutor" )
|
02 |
public class ScheduledExecutor extends HttpServlet {
|
04 |
@Resource (name = "DefaultManagedScheduledExecutorService" )
|
05 |
ManagedScheduledExecutorService scheduledExecutor;
|
07 |
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
09 |
PrintWriter writer = response.getWriter();
|
11 |
ScheduledFuture<?> futureResult = scheduledExecutor.schedule( new SimpleTask(), 10 ,TimeUnit.SECONDS);
|
13 |
writer.write( "Waiting 10 seconds before firing the task" );
|
If you need to schedule your task repeatedly, then you can use the scheduleAtFixedRate
method, which takes as input the time before firing the Task, the time before each repeated execution and the TimeUnit. See the following example, which schedules a Task every 10 seconds of seconds, after an initial delay of 1 second:
1 |
ScheduledFuture<?> futureResult = scheduledExecutor. scheduleAtFixedRate ( new SimpleTask(), 1 , 10 ,TimeUnit.SECONDS);
|
Capturing the result of a Scheduled execution
If you need to capture a return value from the task that is scheduled to be executed, then you can use theScheduledFuture
interface which is returned by the schedule method. Here’s an example which captures the result from our factorial example Task that we have earlier coded:
01 |
ScheduledFuture<Long> futureResult = |
03 |
scheduledExecutor.schedule( new CallableTask( 5 ), 5 , TimeUnit.SECONDS);
|
05 |
while (!futureResult.isDone()) {
|
09 |
Thread.sleep( 100 ); // Wait
|
11 |
} catch (InterruptedException e) {
|
21 |
writer.write( "Callable Task returned " +futureResult.get());
|
23 |
} catch ( Exception e) {
|
Creating Managed Threads using the ManagedThreadFactory
The javax.enterprise.concurrent.ManagedThreadFactory
is the equivalent of the J2SE ThreadFactory, which can be used to create your own Threads. In order to use the ManagedThreadFactory, you need to inject it from the JNDI as usual:
1 |
@Resource (name = "DefaultManagedThreadFactory" )
|
3 |
ManagedThreadFactory factory; |
The main advantage of creating your own Managed Threads from a Factory (compared with those created by the ManagedExecutorService) is that you can set some typical Thread properties (such as name or priority) and that you can create a managed version of the J2SE Executor Service. The following examples will show you how.
Creating Managed Threads from a Factory
In this example, we will create and start a new Thread using the DefaultManagedThreadFactory
. As you can see from the code, once that we have created an instance of a Thread class, we are able to set a meaningful name for it and associate it with a priority. We will then associate the Thread with our SimpleTask that logs some data on the console:
01 |
@WebServlet ( "/FactoryExecutorServlet" )
|
03 |
public class FactoryExecutorServlet extends HttpServlet {
|
05 |
@Resource (name = "DefaultManagedThreadFactory" )
|
06 |
ManagedThreadFactory factory;
|
08 |
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
10 |
PrintWriter writer = response.getWriter();
|
12 |
Thread thread = factory.newThread( new SimpleTask());
|
14 |
thread.setName( "My Managed Thread" );
|
16 |
thread.setPriority(Thread.MAX_PRIORITY);
|
20 |
writer.write( "Thread started. Check logs" );
|
Now check your server logs: no doubt that it is easier to detect the output of your self-created Threads:
1 |
14:44:31,838 INFO [stdout] (My Managed Thread) Simple Task started |
Collecting information about the Thread name is especially useful when analyzing a thread dump and the thread name is the only clue to trace a thread execution path.
Using a Managed Executor Service
The java.util.concurrent.ExecutorService
interface is a standard J2SE mechanism, which has vastly replaced the usage of direct Threads to perform asynchronous executions. One of the main advantages of the ExecutorService over the standard Thread mechanism is that you can define a pool of instances to execute your jobs and that you have a safer way to interrupt your jobs.
Using the ExecutorService in your Enterprise applications is straightforward: all you have to do is passing an instance of yourManaged ThreadFactory
to a constructor of your ExecutorService
. In the following example, we are using a SingletonEJB to provide the ExecutorService as a service in its method getThreadPoolExecutor
:
03 |
public class PoolExecutorEJB {
|
05 |
private ExecutorService threadPoolExecutor = null ;
|
11 |
long keepAliveTime = 5000 ;
|
13 |
@Resource (name = "DefaultManagedThreadFactory" )
|
14 |
ManagedThreadFactory factory;
|
16 |
public ExecutorService getThreadPoolExecutor() {
|
18 |
return threadPoolExecutor;
|
25 |
threadPoolExecutor = new ThreadPoolExecutor(corePoolSize, maxPoolSize,
|
27 |
keepAliveTime, TimeUnit.SECONDS,
|
29 |
new ArrayBlockingQueue<Runnable>( 10 ), factory);
|
34 |
public void releaseResources() {
|
36 |
threadPoolExecutor.shutdown();
|
The ThreadPoolExecutor contains two core parameters in its constructor: the corePoolSize
and the maximumPoolSize
. When a new task is submitted in method and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full.
The ExecutorService
is then used to start a new asynchronous task as in the following example, where an anonymous implementation of Runnable is provided in a Servlet:
01 |
@WebServlet ( "/FactoryExecutorServiceServlet" )
|
02 |
public class FactoryExecutorServiceServlet extends HttpServlet {
|
04 |
@EJB PoolExecutorEJB ejb;
|
06 |
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
08 |
final PrintWriter writer = response.getWriter();
|
10 |
writer.write( "Invoking ExecutorService. Check Logs." );
|
12 |
ExecutorService executorService = ejb.getThreadPoolExecutor();
|
14 |
executorService.execute( new Runnable() {
|
18 |
System.out.println( "Message from your Executor!" );
|
As soon as the PoolExecutorEJB
is terminated, the ExecutorService will be finalized as well in the @PreDestroy
method of the Singleton Bean which will invoke the shutdown()
method of the ThreadPoolExecutor. The ExecutorService will not shut down immediately, but it will no longer accept new tasks, and once all threads have finished current tasks, the ExecutorService shuts down.
Using Dynamic Contextual objects
A dynamic proxy is an useful Java tweak that can be used create dynamic implementations of interfaces using thejava.lang.reflect.Proxy
API. You can use dynamic proxies for a variety of different purposes such as database connection and transaction management, dynamic mock objects for unit testing and other AOP-like method intercepting purposes.
In a Java EE Environment, you can use a special type of dynamic proxies called dynamic contextual proxies.
The most interesting feature of dynamic contextual objects is that the JNDI naming context, classloader, and security context are propagated to the proxied objects. This can be useful in a context where you are bringing J2SE implementations in your Enterprise applications and want to run them within the context of the container.
The following snippet shows how to inject contextual objects into the container. Since contextual objects also need an ExecutorService to which you can submit the task, a ThreadFactory is injected as well:
1 |
@Resource (name = "DefaultContextService" )
|
5 |
@Resource (name = "DefaultManagedThreadFactory" )
|
7 |
ManagedThreadFactory factory; |
In the following section, we will show how to create dynamic contextual objects using a revised version of our Singleton EJB.
Executing Contextual Tasks
The following example shows how to trigger a contextual proxy for a Callable
task. For this purpose, we will need both the ManagedThreadfactory and the ContextService. Our ContextExecutor EJB will initially create the ThreadPoolExecutor within itsinit
method. Then, within the submit method, new contextual proxies for Callable tasks are created and submitted to the ThreadPool Executor.
Here is the code for our ContextExecutorEJB
:
03 |
public class ContextExecutorEJB {
|
05 |
private ExecutorService threadPoolExecutor = null ;
|
07 |
@Resource (name = "DefaultManagedThreadFactory" )
|
08 |
ManagedThreadFactory factory;
|
10 |
@Resource (name = "DefaultContextService" )
|
13 |
public ExecutorService getThreadPoolExecutor() {
|
15 |
return threadPoolExecutor;
|
21 |
threadPoolExecutor = new ThreadPoolExecutor( 5 , 10 , 5 , TimeUnit.SECONDS,
|
23 |
new ArrayBlockingQueue>Runnable>( 10 ), factory);
|
26 |
public Future>Long> submitJob(Callable>Long> task) {
|
28 |
Callable>Long> proxy = cs.createContextualProxy(task, Callable. class );
|
30 |
return getThreadPoolExecutor().submit(proxy);
|
The CallableTask class is a bit more complex than our first example, as it is going to log information about thejavax.security.auth.Subject
, which is contained in the caller Thread:
01 |
public class CallableTask implements Callable<Long> {
|
05 |
public CallableTask( int id) {
|
17 |
Subject subject = Subject.getSubject(AccessController.getContext());
|
19 |
logInfo(subject, summation); // Log Traces Subject identity
|
21 |
return new Long(summation);
|
25 |
private void logInfo(Subject subject, long summation) { . . }
|
Following here is a simple way to submit new contextual tasks to our SingletonEJB:
1 |
@EJB ContextExecutorEJB ejb;
|
3 |
protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
|
5 |
CallableTask task = new CallableTask( 5 );
|
Building your examples
In order to use the Concurrency utilities for Java EE API you need the following Maven dependency in your application:
3 |
< groupId >org.jboss.spec.javax.enterprise.concurrent</ groupId >
|
5 |
< artifactId >jboss-concurrency-api_1.0_spec</ artifactId >
|
7 |
< version >1.0.0.Final</ version >
|
This excerpt has been taken from the “Practical Java EE 7 development on WildFly” book which is a hands-on practical guide disclosing all areas of Java EE 7 development on the newest WildFly application server. Covers everything from the foundation components (EJB, Servlets, CDI, JPA) to the new technology stack defined in Java Enterprise Edition 7 hence including the new Batch API, JSON-P Api, the Concurrency API,Web Sockets, the JMS 2.0 API, the core Web services stack (JAX-WS, JAX-RS). The testing area with Arquillian framework and the Security API complete the list of topics discussed in the book.