Saturday, March 3, 2012

Schedule jobs with WorkManager

Leveraging EJB Timers for J2EE Concurrency
http://www.devx.com/Java/Article/33694/1954

The Work Manager API: Parallel Processing Within a J2EE Container
http://www.devx.com/Java/Article/28815/0


Using Work Managers to Optimize Scheduled Work
http://docs.oracle.com/cd/E14571_01/web.1111/e13701/self_tuned.htm#CNFGD112

Using CommonJ With WebLogic Server
http://docs.oracle.com/cd/E14571_01/web.1111/e13701/self_tuned.htm#i1069944


JBoss WorkManagerTaskExecutor
http://static.springsource.org/spring/docs/2.5.x/api/org/springframework/scheduling/commonj/WorkManagerTaskExecutor.html

Package commonj.work

http://docs.oracle.com/cd/E14571_01/apirefs.1111/e13941/commonj/work/package-summary.html

commonj.work.WorkManager

The WorkManager is the abstraction for dispatching and monitoring asynchronous work and is a factory for creating application short or long lived Works.

WorkManagers are created by the server administrator. The vendor specific systems management console allows the administrator to create one or more WorkManagers and associate a JNDI name with each one. The administrator may specify implementation specific information such as min/max Works for each WorkManager. An application that requires a WorkManager should declare a resource-ref in the EJB or webapp that needs the WorkManager. The vendor descriptor editor or J2EE IDE can be used to bind this resource-ref to a physical WorkManager at deploy or development time. An EJB or servlet can then get a reference to a WorkManager by looking up the resource-ref name in JNDI and then casting it. For example, if the resource-ref was called wm/WorkManager:
 <resource-ref>
<res-ref-name>wm/WorkManager</res-ref-name>
<res-type>commonj.work.WorkManager</res-type>
<res-auth>Container</res-auth>
<res-sharing-scope>Shareable</res-sharing-scope>
</resource-ref>


The Java code to look this up would look like:
  InitialContext ic = new InitialContext();
WorkManager wm = (WorkManager)ic.lookup("java:comp/env/wm/WorkManager");


The res-auth and res-sharing scopes are ignored in this version of the specification. The EJB or servlet can then use the WorkManager as it needs to.
When a Work is scheduled, the declared context that is present on the thread (the J2EE context) will be saved and propagated to the asynchronous methods that are executed. This J2EE context at minimum will contain the java:comp namespace and ClassLoader of the scheduler unless specified otherwise. Other J2EE contexts such as security or a transactional context may be optionally added by the application server vendor. Global transactions are always available using the java:comp/UserTransaction JNDI name and are used in the same fashion as they are used in servlets and bean-managed transaction Enterprise Java Beans.

A WorkManager can also be a pinned WorkManager. This is a WorkManager obtained using the RemoteWorkItem.getWorkManager method. This allows subsequent scheduleWorks to be send to the same remote WorkManager as the one that is associated with the RemoteWorkItem. Pinned WorkManagers are only supported on vendor implementations that support remote Works. However, applications that follow the programming model will work on all implementations however serializable Works will be executed within the local JVM only on those implementations.

If the scheduled Work is a daemon Work, then the life-cycle of that Work is tied to the application that scheduled it. If the application is stopped, the Work.release() method will be called.


Using the Job Scheduler
http://docs.oracle.com/cd/E14571_01/web.1111/e13733/toc.htm#i1058048

WorkManagerService
package dave;

import java.util.Collection;

import javax.ejb.Remote;

@Remote
public interface WorkManagerService {

public void processJob();

public void processTask(Task task);

public Collection<Task> processTasks(Collection<Task> tasks);

}




CommonJWorkManager
package dave;

import java.util.ArrayList;
import java.util.Collection;

import javax.annotation.Resource;
import javax.ejb.LocalBean;
import javax.ejb.Stateless;

import weblogic.work.ExecuteThread;

import commonj.work.Work;
import commonj.work.WorkEvent;
import commonj.work.WorkException;
import commonj.work.WorkItem;
import commonj.work.WorkManager;

/**
* Session Bean implementation class CommonJWorkManager
*/
@Stateless(mappedName = "WorkManagerService")
@LocalBean
public class CommonJWorkManager implements WorkManagerService {

@Resource(name = "daveWM")
WorkManager wm;

/**
* Default constructor.
*/
public CommonJWorkManager() {
// TODO Auto-generated constructor stub
}

public void processJob() {

try {
System.out.println("## [CommonJWorkManager] executing in: "
+ ((ExecuteThread) Thread.currentThread()).getWorkManager()
.getName());
wm.schedule(new Work() {
public void run() {
ExecuteThread th = (ExecuteThread) Thread.currentThread();
System.out
.println("## [CommonJWorkManager] self-tuning workmanager: "
+ th.getWorkManager().getName());
}

public void release() {
}

public boolean isDaemon() {
return false;
}
});

} catch (WorkException e) {
e.printStackTrace();
}

}

public void processTask(Task task) {

System.out.println("Plan task " + task.getName());

try {
System.out.println("## [CommonJWorkManager] executing in: "
+ ((ExecuteThread) Thread.currentThread()).getWorkManager()
.getName());

wm.schedule(new FibWork(task));

} catch (WorkException e) {
e.printStackTrace();
}

}

@Override
public Collection<Task> processTasks(Collection<Task> tasks) {

Collection<WorkItem> workItems = new ArrayList<WorkItem>();
Collection<Task> out = new ArrayList<Task>();

try {
System.out.println("## [CommonJWorkManager] executing in: "
+ ((ExecuteThread) Thread.currentThread()).getWorkManager()
.getName());

for (Task task : tasks) {
System.out.println("Plan task " + task.getName());
WorkItem workItem = wm.schedule(new FibWork(task));
workItems.add(workItem);
}
System.out.println("wm.waitForAll");
wm.waitForAll(workItems, 60000);
for (WorkItem workItem : workItems) {
System.out.println("workItem.result=" + workItem.getStatus());
if (workItem.getStatus() == WorkEvent.WORK_COMPLETED
|| workItem.getStatus() == WorkEvent.WORK_REJECTED) {
FibWork work = (FibWork) workItem.getResult();
if (work != null) {
out.add(work.getTask());
System.out.println("workItem task"
+ work.getTask().getName() + " result="
+ ((FibonacciTask) work.getTask()).getResult());
}
}
}

} catch (WorkException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}

return out;

}

}



FibWork
package dave;

import weblogic.work.ExecuteThread;
import commonj.work.Work;

public class FibWork implements Work {

Task task;

public Task getTask() {
return task;
}

public void setTask(Task task) {
this.task = task;
}

public FibWork(Task task) {
this.task = task;
}

@Override
public void run() {

ExecuteThread th = (ExecuteThread) Thread.currentThread();
System.out.println("## [FibWork] self-tuning workmanager: "
+ th.getWorkManager().getName());
System.out.println("Running task " + task.getName());
task.run();

}

@Override
public boolean isDaemon() {
return false;
}

@Override
public void release() {
// TODO Auto-generated method stub

}

}



FibonacciCalculatorBean
package dave;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

import javax.ejb.EJB;
import javax.ejb.Stateless;

/**
* Session Bean implementation class FibonacciCalculatorBean
*/
@Stateless(mappedName = "FibonacciCalculator")
public class FibonacciCalculatorBean implements FibonacciCalculator {

@EJB(mappedName = "WorkManagerService")
private WorkManagerService workManagerService;


public void calculateFibonacci(int[] numbers) {
long initialTime = System.currentTimeMillis();
List<Task> tasks = new ArrayList<Task>();
for (int i = 1; i < numbers.length; i++){
FibonacciTask fibonacciTask = new FibonacciTask(numbers[i]);
tasks.add(fibonacciTask);
}
Collection<Task> resultTasks = workManagerService.processTasks(tasks);
long elapsedTime = System.currentTimeMillis() - initialTime;
printFibonacciResults(resultTasks, elapsedTime);
}

private void printFibonacciResults(Collection<Task> tasks, long elapsedTime) {
System.out.println("** Completed Fibonacci Computations in " + elapsedTime
+ "ms **");
for (Task task : tasks) {
FibonacciTask ft = (FibonacciTask) task;
System.out.println("Fibonacci(" + ft.getN() + ") = " + ft.getResult());
}
}

}



weblogic-ejb-jar.xml
<?xml version="1.0" encoding="UTF-8"?>
<wls:weblogic-ejb-jar xmlns:wls="http://xmlns.oracle.com/weblogic/weblogic-ejb-jar" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_1.xsd http://xmlns.oracle.com/weblogic/weblogic-ejb-jar http://xmlns.oracle.com/weblogic/weblogic-ejb-jar/1.3/weblogic-ejb-jar.xsd">
<!--weblogic-version:12.1.1-->
<wls:weblogic-enterprise-bean>
<wls:ejb-name>CommonJWorkManager</wls:ejb-name>
<wls:stateless-session-descriptor></wls:stateless-session-descriptor>
<wls:transaction-descriptor>
<wls:trans-timeout-seconds>300</wls:trans-timeout-seconds>
</wls:transaction-descriptor>
<wls:dispatch-policy>daveWM</wls:dispatch-policy>
</wls:weblogic-enterprise-bean>

<wls:work-manager>
<wls:name>daveWM</wls:name>

<wls:max-threads-constraint>
<wls:name>MaxThreadsCountTen</wls:name>
<wls:count>10</wls:count>
</wls:max-threads-constraint>
</wls:work-manager>
</wls:weblogic-ejb-jar>


Client
package dave;

import java.util.Hashtable;

import javax.naming.Context;
import javax.naming.InitialContext;

public class FibonacciClient {

public static void main(String[] args)

{
try

{
Hashtable<String, String> env = new Hashtable<String, String>();
env.put(Context.INITIAL_CONTEXT_FACTORY,
"weblogic.jndi.WLInitialContextFactory");
env.put(Context.SECURITY_PRINCIPAL, "weblogic");
env.put(Context.SECURITY_CREDENTIALS, "weblogic123");
env.put(Context.PROVIDER_URL, "t3://localhost:7001");
Context ctx = new InitialContext(env);
System.out.println("Initial Context created");

/*TimerSession timerSession = (TimerSession) ctx
.lookup("TimerSession#dave.TimerSession");
timerSession.createTimer(100000);*/

FibonacciCalculator fibonacciCalculator = (FibonacciCalculator) ctx
.lookup("FibonacciCalculator#dave.FibonacciCalculator");
System.out.println("lookup successful");
System.out.println("Calling EJB method . . .");
int fibMax = 10;
int[] numbers =new int[fibMax];
for(int i = 1; i< fibMax; i++){
numbers[i] = i;
}
fibonacciCalculator.calculateFibonacci(numbers);
}

catch (Exception e) {
e.printStackTrace();
}
}
}



Output
## [CommonJWorkManager] executing in: default
Plan task FibonacciTask 1
Plan task FibonacciTask 2
Plan task FibonacciTask 3
## [FibWork] self-tuning workmanager: daveWM
Running task FibonacciTask 1
Plan task FibonacciTask 4
## [FibWork] self-tuning workmanager: daveWM
## [FibWork] self-tuning workmanager: daveWM
Running task FibonacciTask 2
Running task FibonacciTask 3
## [FibWork] self-tuning workmanager: daveWM
Running task FibonacciTask 4
Plan task FibonacciTask 5
Plan task FibonacciTask 6
## [FibWork] self-tuning workmanager: daveWM
Running task FibonacciTask 5
## [FibWork] self-tuning workmanager: daveWM
Running task FibonacciTask 6
Plan task FibonacciTask 7
Plan task FibonacciTask 8
## [FibWork] self-tuning workmanager: daveWM
Plan task FibonacciTask 9
## [FibWork] self-tuning workmanager: daveWM
Running task FibonacciTask 8
## [FibWork] self-tuning workmanager: daveWM
Running task FibonacciTask 9
Running task FibonacciTask 7
wm.waitForAll
workItem.result=4
workItem taskFibonacciTask 1 result=1
workItem.result=4
workItem taskFibonacciTask 2 result=1
workItem.result=4
workItem taskFibonacciTask 3 result=2
workItem.result=4
workItem taskFibonacciTask 4 result=3
workItem.result=4
workItem taskFibonacciTask 5 result=5
workItem.result=4
workItem taskFibonacciTask 6 result=8
workItem.result=4
workItem taskFibonacciTask 7 result=13
workItem.result=4
workItem taskFibonacciTask 8 result=21
workItem.result=4
workItem taskFibonacciTask 9 result=34
** Completed Fibonacci Computations in 17ms **
Fibonacci(1) = 1
Fibonacci(2) = 1
Fibonacci(3) = 2
Fibonacci(4) = 3
Fibonacci(5) = 5
Fibonacci(6) = 8
Fibonacci(7) = 13
Fibonacci(8) = 21
Fibonacci(9) = 34



WorkItems not finished on timeout - result 3
workItem.result=4
workItem taskFibonacciTask 41 result=165580141
workItem.result=4
workItem taskFibonacciTask 42 result=267914296
workItem.result=4
workItem taskFibonacciTask 43 result=433494437
workItem.result=3
workItem.result=3
workItem.result=3
workItem.result=3
workItem.result=3
workItem.result=3
** Completed Fibonacci Computations in 60115ms **
Fibonacci(1) = 1
Fibonacci(2) = 1
Fibonacci(3) = 2
Fibonacci(4) = 3
Fibonacci(5) = 5
Fibonacci(6) = 8
Fibonacci(7) = 13
Fibonacci(8) = 21
Fibonacci(9) = 34
Fibonacci(10) = 55
Fibonacci(11) = 89
Fibonacci(12) = 144
Fibonacci(13) = 233






Requests waiting for processing


All threads


Threads used
## [CommonJWorkManager] executing in: default
Plan task FibonacciTask 1
Plan task FibonacciTask 2
## [FibWork][ACTIVE] ExecuteThread: '2' for queue: 'weblogic.kernel.Default (self-tuning)' self-tuning workmanager: daveWM
Running task FibonacciTask 1
Plan task FibonacciTask 3
Plan task FibonacciTask 4
## [FibWork][ACTIVE] ExecuteThread: '15' for queue: 'weblogic.kernel.Default (self-tuning)' self-tuning workmanager: daveWM
Running task FibonacciTask 3
## [FibWork][ACTIVE] ExecuteThread: '31' for queue: 'weblogic.kernel.Default (self-tuning)' self-tuning workmanager: daveWM
Running task FibonacciTask 2
Plan task FibonacciTask 5
## [FibWork][ACTIVE] ExecuteThread: '20' for queue: 'weblogic.kernel.Default (self-tuning)' self-tuning workmanager: daveWM
Running task FibonacciTask 4
## [FibWork][ACTIVE] ExecuteThread: '29' for queue: 'weblogic.kernel.Default (self-tuning)' self-tuning workmanager: daveWM
Running task FibonacciTask 5
Plan task FibonacciTask 6
Plan task FibonacciTask 7
## [FibWork][ACTIVE] ExecuteThread: '6' for queue: 'weblogic.kernel.Default (self-tuning)' self-tuning workmanager: daveWM
Running task FibonacciTask 6
## [FibWork][ACTIVE] ExecuteThread: '30' for queue: 'weblogic.kernel.Default (self-tuning)' self-tuning workmanager: daveWM
Running task FibonacciTask 7
Plan task FibonacciTask 8
Plan task FibonacciTask 9

3 comments:

  1. Hi Daniel,
    Thx a lot for your great articles..
    Is it possible to share your FibonacciTask and Task code source?

    Cheers,
    Kim

    ReplyDelete
  2. Hi Daniel,
    is there anyway to cancel/stop the computation after a defined timeout :

    wm.waitForAll(workItems, 60000);
    loop the workItems, for those status different from completed or rejected status, stop the on-going computation?
    Something similar when dealing with Future Object by get(60, TimeUnit.SECONDS)...when TimeoutException, call cancel method that will immediatly stop the process?

    If not, i m thinking about dealing the WorkManager with Future Objects..

    Thank you so much in advance.
    Cheers,
    Kim

    ReplyDelete
  3. Nice and good article. It is very useful for me to learn and understand easily. Thanks for sharing your valuable information and time. Please keep updating AWS Online Traning

    ReplyDelete