What
is ThreadPool?
ThreadPool
is a pool of threads which reuses a fixed number of threads to execute tasks.
At
any point, at most nThreads threads will be active processing tasks. If
additional tasks are submitted when all threads are active, they will wait in
the queue until a thread is available.
ThreadPool
implementation internally uses LinkedBlockingQueue for adding and removing
tasks.
How
ThreadPool works?
We
will instantiate ThreadPool, in ThreadPool’s constructor nThreads number of
threads are created and started.
/*
Create ThreadPool of Size#4. */
ThreadPool pool = new
ThreadPool(4);
Here 4
threads will be created and started in ThreadPool. Then, threads will enter
run() method of PoolWorker class and
will call poll() method on queue.
If
tasks are available thread will execute task by entering run() method of task else
waits for tasks to become available. (As tasks executed always implements
Runnable).
public void
run() {
Runnable task;
while(true) {
synchronized (queue) {
while
(queue.isEmpty()) {
try {
queue.wait();
} catch
(InterruptedException e) {
System.out.println("An error while queue is
waiting: " + e.getMessage());
}
}
task =
queue.poll();
}
// If we don't catch RuntimeException, the pool could leak
threads
try {
task.run();
} catch
(RuntimeException e) {
System.out.println("Thread pool is interrupted: "
+
e.getMessage());
}
}
}
When
tasks are added?
When
execute() method of ThreadPool is
called, it internally calls add() method on queue to add tasks.
public void
execute(Task task) {
synchronized (queue) {
queue.add(task);
queue.notify();
}
}
Once
tasks are available all waiting threads are notified that task is available.
In
the above code, we used notify() instead of
notifyAll(). Because notify() has more desirable performance
characteristics than notifyAll(); in particular, notify() causes many fewer
context switches, which is important in a server application. But it is
important to make sure when using notify() in other situation as there are
subtle risks associated with using notify(), and it is only appropriate to use
it under certain specific conditions.
How
threads in ThreadPool can be stopped?
shutDown() method can be used
to stop threads executing in ThreadPool, once shutdown of ThreadPool is
initiated, previously submitted tasks are executed, but no new tasks could be
accepted.
After thread has executed task
Check
whether pool shutDown has been initiated or not, if pool shutDown has been
initiated and
queue
does not contain any unExecuted task (i.e. queue size is 0) than interrupt() the thread.
public void
run() {
Runnable task;
while(true) {
…………………………………………………………………
…………………………………………………………………
…………………………………………………………………
…………………………………………………………………
/*
* 1) Check whether pool shutDown has been
initiated or not,
* If pool shutDown has been initiated
* AND
* 2) queue does not contain any unExecuted
task(i.e. queue's size is 0)
* than interrupt() the thread.
*/
if(this.threadPool.isPoolShutDownInitiated()
&& this.threadPool.queue.size()==0){
this.interrupt();
/*
*
Interrupting basically sends a message to the thread
*
indicating it has been interrupted but it doesn't cause
* a
thread to stop immediately,
* If
sleep is called, thread immediately throws
*
InterruptedException
*/
try {
Thread.sleep(1);
} catch
(InterruptedException e) {
System.out.println("InterruptedException while calling sleep.");
}
}
}
}
Effective
use of ThreadPools
Thread
pool is a powerful mechanism for structuring multithreaded applications, but it
is not without risk. Applications built with thread pools could have all the
same concurrency risks as any other multithreaded applications, such as
deadlock, resource thrashing, synchronization or concurrency errors, thread
leakage and request overload.
Some important points:
1. Do
not queue tasks which wait synchronously for other tasks as this can cause a
deadlock.
2. If
the task requires waiting for a resource such as I/O, specify a maximum wait
time and then fail or requeue the task execution. This guarantees that some
progress will be made by freeing the thread for another task that might
complete successfully.
3. Tune
the thread pool size effectively, and understand that having too few threads or
too many threads both can cause problems. The optimum size of a thread pool
depends on the number of available processors and the nature of the tasks on
the work queue.
ThreadPool
implemention:
ThreadPool.java
package com.thread;
import
java.util.concurrent.LinkedBlockingQueue;
public class ThreadPool {
private final int nThreads;
private final PoolWorker[] threads;
private final LinkedBlockingQueue<Task>
queue;
private boolean poolShutDownInitiated;
public
ThreadPool(int nThreads) {
this.nThreads = nThreads;
this.threads = new PoolWorker[this.nThreads];
this.queue = new
LinkedBlockingQueue<Task>();
for (int i
= 0; i < nThreads; i++) {
threads[i]
= new PoolWorker(this);
threads[i].start();
}
}
public void execute(Task task) {
synchronized (queue) {
queue.add(task);
queue.notify();
}
}
public boolean isPoolShutDownInitiated() {
return poolShutDownInitiated;
}
/**
* Initiates shutdown of ThreadPool, previously submitted tasks
* are executed, but no new tasks will be accepted.
*/
public synchronized void shutdown(){
this.poolShutDownInitiated = true;
System.out.println("ThreadPool
SHUTDOWN initiated.");
}
private class PoolWorker extends Thread {
private ThreadPool threadPool;
public PoolWorker(ThreadPool threadPool) {
this.threadPool = threadPool;
}
public void
run() {
Runnable task;
while(true) {
synchronized (queue) {
while (queue.isEmpty())
{
try {
queue.wait();
} catch (InterruptedException e) {
System.out.println("Error
while queue is waiting:" + e.getMessage());
}
}
task = queue.poll();
}
/*
If we don't catch RuntimeException, the pool could leak threads*/
try {
task.start();
} catch (RuntimeException e) {
System.out.println("Thread
pool is interrupted: " + e.getMessage());
}
/*
* 1) Check whether pool shutDown has been
initiated or not,
* if pool shutDown has been initiated
* AND
* 2) queue does not contain any unExecuted
task (i.e. queue's size is 0)
* than interrupt() the thread.
*/
if(this.threadPool.isPoolShutDownInitiated()
&& this.threadPool.queue.size()==0){
this.interrupt();
/*
*
Interrupting basically sends a message to the thread
*
indicating it has been interrupted but it doesn't cause
* a
thread to stop immediately,
*
* if
sleep is called, thread immediately throws
*
InterruptedException
*/
try {
Thread.sleep(1);
} catch (InterruptedException e) {
System.out.println("InterruptedException
while calling sleep.");
}
}
}
}
}
}
Task.java
package com.thread;
public class Task implements Runnable {
private int num;
public Task(int n) {
num = n;
}
public void run() {
System.out.println("Task
" + num + " is running.");
}
}
ThreadPoolTest.java
package com.thread;
/**
* Test Thread Pool Scheduler
* @author rajesh.dixit
*/
public class ThreadPoolTest {
public static void main(String[] args) {
/*
Create ThreadPool of Size#4. */
ThreadPool pool = new ThreadPool(4);
for (int i = 0; i < 5; i++) {
Task task = new Task(i);
pool.execute(task);
}
}
}
No comments:
Post a Comment