Welcome to this tutorial. In this tutorial we will discuss about creating our own ThreadPool by limiting no. of threads and pool size. We have 2 class called ThreadPoolTester and MyThreadPool where in ThreadPoolTester we are implementing the run() method and returning the Runabble instance. Each instance are managed by other class called MyThreadPool which extends the ThreadGroup class. Below are those 2 classes sample code and sample output.
public class ThreadPoolTester {
public static void main(String[] args) {
// Assign your no. of thread to create
int noOfProcess = 5;
// Assign your own thread pool size
int poolMaxSize = 2;
// Creating Threadpool with the thread size given above
MyThreadPool threadPool = new MyThreadPool(poolMaxSize);
// Creating threadpool Object
ThreadPoolTester obj = new ThreadPoolTester();
long time1 = getTime();
for (int i = 0; i < noOfProcess; i++) {
threadPool.process(obj.startProcess(i));
/**
* Just for showing threadpool not empty we have
* place this while loop and making thread to sleep
* In realtime while loop should not there, since it
* will create performance issue.
*/
while (threadPool.getTaskListSize() >= poolMaxSize) {
try {
System.out.println("Threadpool NOT Empty - " + threadPool.getTaskListSize());
Thread.sleep(1000);
System.out.println("Threadpool size after sleeping - " + threadPool.getTaskListSize());
} catch (Exception e) {
e.printStackTrace();
}
}
}
// Finally waiting for all thread to complete its process
threadPool.join();
long time2 = getTime();
System.out.println("Time Taken ::: " + (time2 - time1) +" millisecond(s)");
}
private Runnable startProcess(final int taskID) {
return new Runnable() {
public void run() {
System.out.println("Task " + taskID + ": start");
// Putting each task to wait for random milli seconds
try {
Thread.sleep(new Random().nextInt(2000));
} catch (InterruptedException ex) {
}
System.out.println("Task " + taskID + ": end");
}
};
}
public static long getTime() {
Calendar currentDate = Calendar.getInstance();
long time = currentDate.getTimeInMillis();
return time;
}
}
/**
* Main ThreadPool class where we are extending ThreadGroup
* to implement our own pooling
*/
public class MyThreadPool extends ThreadGroup {
private boolean active = false;
private LinkedList<Runnable> qList = null;
private int tId = 0;
private static int tPoolID = 0;
public MyThreadPool(int numThreads) {
super("Pool - " + (tPoolID++));
setDaemon(true);
active = true;
qList = new LinkedList<Runnable>();
for (int i = 0; i < numThreads; i++) {
new PooledThread().start();
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public synchronized void process(Runnable task) {
if (!active) {
throw new IllegalStateException();
}
if (task != null) {
qList.add(task);
notify();
}
}
protected synchronized Runnable getTask() throws InterruptedException {
while (qList.size() == 0) {
if (!active) {
return null;
}
wait();
}
return (Runnable) qList.removeFirst();
}
protected int getTaskListSize() {
return qList.size();
}
public synchronized void close() {
if (active) {
active = false;
qList.clear();
interrupt();
}
}
public void join() {
synchronized (this) {
active = false;
notifyAll();
}
Thread[] threads = new Thread[activeCount()];
int count = enumerate(threads);
for (int i = 0; i < count; i++) {
try {
threads[i].join();
} catch (InterruptedException ex) {
}
}
}
private class PooledThread extends Thread {
public PooledThread() {
super(MyThreadPool.this, "Task -" + (tId++));
}
public void run() {
while (!isInterrupted()) {
Runnable process = null;
try {
process = getTask();
} catch (InterruptedException ex) {}
if (process == null) {
return;
}
try {
process.run();
} catch (Throwable t) {
uncaughtException(this, t);
}
}
}
}
}
OUTPUT:
Task 0: start
Threadpool NOT Empty - 2
Task 1: start
Task 1: end
Task 2: start
Task 0: end
Threadpool size after sleeping - 0
Threadpool NOT Empty - 2
Task 3: start
Task 3: end
Task 4: start
Threadpool size after sleeping - 0
Task 2: end
Task 4: end
Time Taken ::: 3285 millisecond(s)
2 comments:
Write comments