0%

JAVA并发编程实战2

并发编程第二部分:结构化并发应用程序

一、任务执行

1.1 Executor框架

任务是一组逻辑执行单元,线程则是使任务异步执行的机制。JAVA类库中,任务执行的主要抽象不是Thread,而是Executor。

1
2
3
public interface Executor {
void execute(Runnable command);
}

Executor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。

在32位机器上,线程数的限制主要是线程栈的地址空间。每个线程维护两个执行栈:一个用于JAVA代码,一个用于原生代码。通常,JVM默认情况下会生成一个复合栈,大小为0.5MB(可以通过Xss或Thread构造函数更改)。操作系统也会有更严格的约束。

基于Executor的Web服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class WebServer {
private static final int N_THREAD = 100;
private static final Executor exec = Executors.newFixedThreadPool(N_THREAD);

public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
};
exec.execute(task);
}
}
}

// 可以通过自定义execute方法的实现,来控制server的行为,如下面的Executor将每个任务在单独的线程执行:
public class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}

Executors中提供了很多静态工厂来创建线程池:newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor、newScheduledThreadPool。服务器压力不大的情况下newCachedThreadPool基本可以满足要求。高并发最好使用newFixedThreadPool。

ExecutorService

ExecutorService扩展了Executor接口,提供了生命周期控制方法。同时Executor只能提供Runnable任务,ExecutorService中增加了Callable。

ExecutorService生命周期的三种状态:运行、关闭、已终止。创建时为运行、shutdown方法执行平缓的关闭过程:不再接受新任务,但是等待正在运行以及已经提交的任务执行完成。shutdownNow将执行粗暴的关闭过程:取消所有运行中的任务、不再启动尚未执行的任务。

找出可用的并行性

Callable与Future

Executor执行的任务有4个生命周期阶段:创建、提交、开始和完成。已提交尚未开始的任务可以取消,已经开始的任务只有他们响应中断时才能取消。

Future表示一个任务的生命周期,并提供了方法获取任务已经完成或者取消,以及获取任务执行结果,取消任务(cancel)。get方法也可以指定时限,异常时调用cancel取消任务。

要使用Callable表示无返回值的任务使用Callable<Void>;

CompletionService:Executor与BlockingQueue

CompletionService可以提交Callable任务,然后使用take、poll获取已完成的结果。

1
2
completionService.submit(new Callable<Data> {});
Future<Data> f = completionService.take(); // 这里的f为已经完成的结果,被封装在Future中。

ExecutorService通过invokeAll方法执行多个任务,重载方法可以指定超时。

二、取消与关闭

在任务、线程、服务以及应用程序等模块中的生命周期结束问题,可能会增加他们在设计和实现时的复杂性。java并没有提供某种抢占式的机制来取消操作或者终结线程。相反,它提供了一种协作式的中断机制来实现取消操作,但这要依赖于如何构建取消操作的协议,以及是否遵循这些协议。通过使用FutureTask和Executor框架,可以帮助我们构建可取消的任务和服务。

要使任务和线程能安全、快速、可靠的停止下来,并不容易。JAVA没有提供安全的终止线程的机制。但是提供了中断(interruption),能使一个线程终止另一个线程的当前工作。

取消

如果外部代码能在某个操作正常完成之前将其置入完成状态,那么这个操作就是可取消的。

自定义的取消机制

通过标记位修改来取消。如果任务中有阻塞方法可能会永远无法取消。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@ThreadSafe
public class PrimeGenerator implements Runnable {
@GuardedBy("this")
private final List<BigInteger> primes = new ArrayList<BigInteger>();

private volatile boolean cancelled;

public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}

public void cancel() {
cancelled = true;
}

public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
}

使用中断取消线程

如果任务调用了一个阻塞方法,例如BlockingQueue.put,那么任务可能永远不会检查标志,任务就永远不会结束。

每个线程都有一个boolean类型的中断状态。当中断线程时,这个线程的中断状态将被设置成true。在Thread中还包含了中断线程以及查询线程中断状态的方法。

1
2
3
4
5
6
public class Thread {
public void interrupt() {...} // 中断线程
public boolean isInterrupted() {..} // 返回中断状态
public static boolean interrupted() {...} // 清除当前的中断状态,并返回他之前的值
...
}

一些特殊的阻塞库方法支持中断。线程中断是一种协作机制,线程可以通过这种机制通知另一个线程,告诉它在合适的或可能的情况下停止当前工作,并转而执行其他工作。

阻塞库方法,如Thread.sleep和Object.wait等,都会检查线程何时中断,并在发现中断时提前返回。它们响应中断时执行的操作包括:清除中断状态、抛出InterruptedException,表示阻塞操作由于中断而提前结束。

中断并不会真正的中断一个正在运行的线程,只是发出中断请求,然后由线程在下一个合适的时刻中断自己。(这些时刻也被称为取消点)中断是实现取消最合理的方式。 如果任务代码能响应中断,那么可以使用中断作为取消机制,并且利用许多库类中提供的中断支持。通常,中断是实现取消最合理的方式。

通过中断来取消的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class PrimeProducer extends Thread {
private final BlockingQueue<BigInteger> queue;

PrimeProducer(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}

public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!Thread.currentThread().isInterrupted())
queue.put(p = p.nextProbablePrime());
} catch (InterruptedException consumed) {
// 允许线程退出
}
public void cancel() {
interrupt();
}
}

中断策略

正如任务中应该包含中断策略一样,线程应当包含取消策略。

最合理的中断策略是某种形式的线程级取消操作或服务级取消操作:尽块退出,在必要时进行清理。

响应中断

当调用可中断的阻塞函数时,两种处理InterruptedException:

  • 传递异常,使你的方法也成为可中断的方法。
  • 恢复中断状态,使调用栈中的上层代码对其进行处理

只有实现了线程中断策略的代码才可以屏蔽中断请求,在常规任务和库代码中都不应该屏蔽中断请求。

不可取消的任务在退出前恢复中断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public Task getNextTask(BlockingQueue<Task> queue) {
boolean interrupted = false;
try {
while (true) {
try {
return queue.take();
} catch (InterruptedException e) {
interrupted = true;
}
}
} finally {
if (interrupted)
Thread.currentThread().interrupted();
}
}

通过Future实现取消

除非你清楚线程的中断策略,否则不要中断线程,那么在什么情况喜爱调用cancel可以讲参数指定为true?执行任务的线程是由标准的Executor创建的,他实现了一种中断策略使得任务可以通过中断被取消,所以如果任务在标准Executor中运行,并通过它们的Future来取消任务,那么可以设置mayInterruptIfRunning。

通过Future来取消任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException {
Future<?> task = taskExec.submit(r);
try {
task.get(timeout, unit);
} catch (TimeoutException e) {
// 接下来任务被取消
} catch (ExecutionException e) {
// 如果任务中跑出了异常,重新抛出异常
throw launderThrowable(e.getCause());
} finally {
// 如果任务已经结束,执行取消操作也不会带来任何影响。
task.cancel();
}
}

如果Future.get抛出InterruptedException或TimeoutException时,如果你知道不再需要结果,那么就可以调用Future.cancel来取消任务。

停止基于线程的服务

应用程序可以拥有服务,服务也可以拥有工作者线程,但应用程序并不能拥有工作者线程,因此应用程序不能直接停止工作者线程。相反,服务应该提供生命周期方法来关闭自己以及他所拥有的线程。

对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,就应该提供生命周期方法。

提供了可靠取消操作的LogWriter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class LogService {
private final BlockingQueue<String> queue;
private final LoggerThread loggerThread;
private final PrintWriter writer;
@GuardedBy("this") private boolean isShutdown;
@GuardedBy("this") private int reservations;

public void start() { loggerThread.start(); }

public void stop() {
synchronized (this) {
isShutdown = true;
}
loggerThread.interrupted();
}

public void log(String msg) throws InterruptedException {
synchronized (this) {
if (isShutdown)
throw new IllegalStateException(...);
++reservations;
}
queue.put(msg);
}

private class LoggerThread extends Thread {
public void run() {
try {
while (true) {
try {
synchronized (LogService.this) {
if (isShutdown && reservations == 0)
break;
}
String msg = queue.take();
synchronized (LogService.this) { --reservations; }
writer.println(msg);
} catch (InterruptedException e) {
/* retry */
}
}
} finally {
writer.close();
}
}
}
}

关闭ExecutorService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class LogService {
private final ExecutorService exec = newSingleThreadExecutor();
...
public void start() {}

public void stop() throws InterruptedException {
try {
exec.shutdown();
exec.awaitTermination(TIMEOUT, UNIT);
} finally {
writer.close();
}
}
public void log(String msg) {
try {
exec.execute(new WriterTask(msg));
} catch (RejectedExecutionException ignored) {}
}
}

“毒丸”对象

毒丸是指放在队列上的对象,含义是:得到对象时,立即停止

只有在无界队列中,毒丸对象才能可靠的工作。

处理非正常的线程终止

导致线程提前死亡的原因主要是RuntimeException,

主动处理线程终止

线程池内部创建一个工作者线程,如果任务抛出了一个未检查异常,那么它将使线程终结,但会首先通知框架该线程已经终结。然后,框架可能会用新的线程来代替这个工作线程,也可能不会,因为线程正在关闭,或者当前已经有足够多的线程能满足需要。

1
2
3
4
5
6
7
8
9
10
11
public void run() {
Throwable thrown = null;
try {
while (!isInterrupted())
runTask(getTaskFromWorkQueue());
} catch (Throwable e) {
thrown = e;
} finally {
threadExited(this, thrown);
}
}

未捕获异常的处理

Thread API中提供了UncaughtExceptionHandler,它能检测出某个线程由于未捕获的异常而终结的情况。

在运行时间较长的应用中,通常会为所有线程的未捕获异常指定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中。

要为线程池中的所有线程设置一个UncaughtExcptionHandler,需要为ThreadPoolExecutor的构造函数提供一个ThreadFactory。

!!只有通过execute提交的任务才能将抛出的异常交给未捕获的异常处理器,而通过submit提交的任务,无论是未检查的异常还是已检查的异常,都被认为是任务返回状态的一部分。如果一个由submit提交的任务由于抛出了异常而结束,那么这个异常将被Future.get封装在ExecutionException中重新抛出。

JVM关闭

关闭钩子

在正常关闭中,JVM首先调用所有已注册的关闭钩子。关闭钩子是指通过Runtime.addShutdownHook注册的但尚未开始的线程。JVM并不会停止或中断任何在关闭时仍然运行的应用程序线程。

守护线程

线程可分为两种:普通线程和守护线程。在JVM启动时创建的所有线程中,除了主线程以外,其他的线程都是守护线程(如垃圾回收线程)。当创建一个新线程时,新线程将继承创建他的线程的守护状态,因为在默认情况下住线程创建的线程都是普通线程。

JVM在退出时,守护线程的状态不会影响JVM的退出,所有守护线程都被抛弃,不会执行finally代码块,而不会执行回卷栈,只是退出。

我们应尽量少的使用守护线程——很少有操作能在不进行清理的情况下被安全的抛弃。

三、线程池的使用

在线程池的线程不应该使用ThreadLocal在任务之间传值(因为线程会共用)。

只有当任务都是同类型并且相互独立时,线程池的性能才能达到最佳。

线程饥饿死锁:线程池中的任务需要无限期等待一些必须由池中其他任务才能提供的资源或条件。

如果任务阻塞的时间过长,即使不出现死锁,线程池的响应也会变的糟糕。

设置线程池的大小

对于计算密集型的任务$N_{cpu}$个处理器的系统上,当线程池的大小为$N_{cpu}+1$时,通常能实现最优的利用率(当计算密集型的线程偶尔由于页缺失故障或者其他而暂停时,这个额外的线程也能确保CPU始终周期不会被浪费)。

要使处理器达到期望的使用率,线程池的最优大小等于:
$$N_{threads} = N_{cpu}U_{cpu}(1+W/C)$$
$U_{cpu}$为目标cpu利用率,$N_{cpu}$为cpu核数,$W/C$为cpu核数, CPU等待时间与计算时间的比值

配置ThreadPoolExecutor

如果无限制创建线程,将导致不稳定,并通过采取固定大小的线程池来解决。然而,如果请求的到达速率超过处理速率,请求将累计,耗尽资源。

ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。基本的任务排队方法有3种:无界队列,有界队列,同步移交(Synchronous Handoff)

newFixedThreadPool,newSingleThreadExecutor默认情况下使用一个无界的LinkedBlockingQueue。

一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockingQueue、PriorityBlockingQueue。

对于非常大的或者无界的线程池,可以通过SynchronousQueue来避免任务排队,以及直接将人物从生产者移交给工作者线程。SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。newCachedThreadPool就使用了这种机制。

newCachedThreadPool能提供比固定大小线程池更好的排队性能。
当要限制当前任务数量以满足资源管理需求时,可以选择固定大小的线程池,比如后端服务。

只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果线程存在依赖关系,有界的线程池就可能导致线程饥饿死锁问题。

饱和策略

有界队列满了之后,饱和策略发挥作用。ThreadPoolExecutor的饱和策略可以通过setRejectedExecutionHandler来修改。

线程工厂

使用定制的线程工厂方法。比如,为线程池中的线程制定一个UncaughtExceptionHandler,或实例化一个定制的Thread用于执行调试信息的记录。

扩展ThreadPoolExecutor

子类中可以改写:beforeExecute,afterExecute,terminated