线程池实现

线程池

关于线程池的基本概念在之前的这片文章中提到了,这里主要分享下在尝试实现简单的CachedThreadPool线程池过程中的一些思考以及代码。

await signal VS wait notify

在实现阻塞队列的时候,这是一种典型的生产者-消费者模式,需要用到线程的等待和唤醒。

第一种实现方式是利用Object的waitnotify来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
final Object object = new Object();

public void consumer() throws InterruptedException {
synchronized (object) {
object.wait();
}
}

public void provider() {
synchronized (object) {
object.notify();
}
}

第二种方式是使用Lock和Condition的awaitsignal来实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
ReentrantLock lock = new ReentrantLock();
Condition condition = lock.newCondition();

public void consumer() throws InterruptedException {
lock.lock();
condition.await();
lock.unlock();
}

public void provider() {
lock.lock();
condition.notify();
lock.unlock();
}

简单来说,这两种方式提供的功能是一样的,相比之下,Object的方式理解起来比较简单,Lock的方式提供了更强大的功能,比如

  • Lock可以支持多种Condition,可以更有针对的进行notification,而Object只能single notification
  • Lock的await(long time, TimeUnit unit)支持超时时间的设置

Worker线程的Task如何实现

创建出来的线程当任务完成之后便会Terminated,那要让线程池中的线程长久存在,就需要将Task实现成死循坏,让方法无法退出。但没有任务的时候,又不能占用CPU的时间。

所以就利用了阻塞队列。不断的从队列中取任务来执行,当没有任务的时候,让线程成为WAITING或者TIMED_WAITING状态,等待任务的到来,再唤醒线程,参考下面代码的workerTask()方法。

KeepAlive如何实现

我们知道如果启用了KeepAlive的话,当线程数>核心线程数,并且空闲时间超过了KeepAlive的话,多余的线程便会被销毁。那线程池是如何判断线程空闲的时间超过了设定时间呢?

正是利用了Confition的超时等待方法await(long time, TimeUnit unit),当线程等待超时后,方法会返回false

如何动态的增加Worker线程

由于我这里实现的是CachedThreadPool, 当队列中的任务满了,并且Worker线程数小于最大线程数的时候,便会动态的增加Worker的线程数。
实现方式是利用队列的offer()方法,如果队列满了,该方法便返回false,再来获取当前Worker的线程数,如果小于最大数,便创建,反之则直接拒绝任务。

简单的实现

第一个文件是阻塞队列的实现,第二个文件是线程池的实现。

BlockingQueue.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package com.ndrlslz.core;

import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class BlockingQueue<T> {
private LinkedList<T> elements;
private final ReentrantLock putReentrantLock = new ReentrantLock();
private final ReentrantLock takeReentrantLock = new ReentrantLock();
private final Condition notFull = putReentrantLock.newCondition();
private final Condition notEmpty = takeReentrantLock.newCondition();
private int size;
private AtomicInteger elementCount = new AtomicInteger(0);

public BlockingQueue(int size) {
elements = new LinkedList<>();
this.size = size;
}

public T poll(long time, TimeUnit unit) throws InterruptedException {
takeReentrantLock.lock();

try {
while (elementCount.get() == 0) {
boolean timeout = !notEmpty.await(time, unit);
if (timeout) {
return null;
}
}

return takeElement();
} finally {
takeReentrantLock.unlock();
}
}

public T poll() {
T element;
takeReentrantLock.lock();

try {
if (elementCount.get() != 0) {

element = takeElement();
} else {
element = null;
}
} finally {
takeReentrantLock.unlock();
}

return element;
}

public T take() throws InterruptedException {
T element;
takeReentrantLock.lock();

try {
while (elementCount.get() == 0) {
notEmpty.await();
}
element = takeElement();
} finally {
takeReentrantLock.unlock();
}

return element;
}

public void put(T element) throws InterruptedException {
putReentrantLock.lock();

try {
while (elementCount.get() == size) {
notFull.await();
}

addElement(element);
} finally {
putReentrantLock.unlock();
}
}

public boolean offer(T element) {
putReentrantLock.lock();

try {
if (elementCount.get() == size) {
return false;
}

addElement(element);
} finally {
putReentrantLock.unlock();
}

return true;
}

private void addElement(T element) {
elements.addLast(element);

if (elementCount.get() == 0) {
signNotEmpty();
}

elementCount.incrementAndGet();

if (elementCount.get() < size) {
notFull.signal();
}
}

private T takeElement() {
T element = elements.removeFirst();

if (elementCount.get() == size) {
signNotFull();
}

elementCount.decrementAndGet();

if (elementCount.get() > 0) {
notEmpty.signal();
}
return element;
}

private void signNotEmpty() {
takeReentrantLock.lock();
notEmpty.signal();
takeReentrantLock.unlock();
}

private void signNotFull() {
putReentrantLock.lock();
notFull.signal();
putReentrantLock.unlock();

}
}

ThreadPool.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.ndrlslz.core;

import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;

public class CachedThreadPool {
private static final int KEEP_ALIVE = 30;
private static final int MAX_THREAD_NUMBER = 30;
private static final int CORE_THREAD_NUMBER = 10;
private static final int QUEUE_LENGTH = 20;
private BlockingQueue<Runnable> waitingTasks;
private HashMap<String, Thread> workers;
private ReentrantLock mainLock = new ReentrantLock();
private AtomicInteger wokerCount = new AtomicInteger();

public void submit(Runnable runnable) throws InterruptedException {
boolean offer = waitingTasks.offer(runnable);
if (!offer) {
if (wokerCount.get() < MAX_THREAD_NUMBER) {
addWorker();
waitingTasks.put(runnable);
} else {
System.out.println("Queue and Worker reach max count, Reject");
}
}
}

public CachedThreadPool() {
waitingTasks = new BlockingQueue<>(QUEUE_LENGTH);
workers = new HashMap<>();

for (int i = 0; i < CORE_THREAD_NUMBER; i++) {
addWorker();
}
}

public void stop() {
workers.forEach((s, thread) -> thread.interrupt());
}

private void addWorker() {
Thread worker = new Thread(workerTask());
workers.put(worker.getName(), worker);
wokerCount.incrementAndGet();
worker.start();
}

private void removeWorker() {
workers.remove(Thread.currentThread().getName());
wokerCount.decrementAndGet();
}

private Runnable workerTask() {
return () -> {
try {
while (true) {
Runnable task;
boolean removeIdleThreadLongerThanKeepAlive = workers.size() > CORE_THREAD_NUMBER;
task = removeIdleThreadLongerThanKeepAlive ? waitingTasks.poll(KEEP_ALIVE, TimeUnit.SECONDS) : waitingTasks.take();
if (task != null) {
task.run();
} else {
mainLock.lock();
try {
if (workers.size() > CORE_THREAD_NUMBER) {
removeWorker();
return;
}
} finally {
mainLock.unlock();
}
}
}

} catch (Exception ignored) {

}
};
}
}