0%

并发编程(OnJava8学习笔记)

并发编程

概述

术语

并发(Concurrency): 同时完成多个任务。在开始处理其他任务之前,当前任务不需要完成。并发解决了阻塞发生的问题。当任务无法进一步执行,直到外部环境发生变化时才会继续执行。最常见的例子是I/O,其中任务必须等待一些input(在这种情况下会被阻止)。这个问题产生在I/O密集型。

并行(parallelism):同时在多个地方完成多个任务。这解决了所谓的计算密集型问题,如果将程序分成多个部分并在不同的处理器上编辑不同的部分,程序可以运行得更快。

作者的定义(非标准化术语)

  • 纯并发:任务仍然在单个CPU上运行。纯并发系统产生的结果比顺序系统更快,但如果有更多的处理器,则运行速度不会更快
  • 并发-并行:使用并发技术,结果程序利用更多处理器并更快地生成结果
  • 并行-并发:使用并行编程技术编写,如果只有一个处理器,结果程序仍然可以运行(Java 8 Streams就是一个很好的例子)。
  • 纯并行:除非有多个处理器,否则不会运行。

并发的新定义:并发性是一系列性能技术,专注于减少等待。

单处理器系统中性能改进的一个常见例子是事件驱动编程,特别是用户界面编程。考虑一个程序执行一些长时间运行操作,从而最终忽略用户输入和无响应。如果你有一个“退出”按钮,你不想在你编写的每段代码中轮询它。这会产生笨拙的代码,无法保证程序员不会忘记执行检查。没有并发性,生成响应式用户界面的唯一方法是让所有任务定期检查用户输入。通过创建单独的执行线程来响应用户输入,该程序保证了一定程度的响应。

Java在顺序语言之上添加对线程的支持而不是在多任务操作系统中分配外部进程,线程在执行程序所代表的单个进程中创建任务交换

四句格言

  1. 不要这样做(如果有一种方法可以在更快的机器上运行您的程序,或者如果您可以对其进行分析并发现瓶颈并在该位置交换更快的算法,那么请执行此操作。只有在显然没有其他选择时才开始使用并发,并且仅在特定的地方使用.如果您被迫进行并发,请采取最简单,最安全的方法来解决问题。)

  2. 没有什么是真的,一切可能都有问题(你必须质疑一切。即使将变量设置为某个值也可能或者可能不会按预期的方式工作,并且从那里开始走下坡路。我已经很熟悉的东西,认为它显然有效但实际上并没有。)

  3. 它起作用,并不意味着它没有问题(你不能证明并发程序是正确的,你只能(有时)证明它是不正确的。大多数情况下你甚至不能这样做:如果它有问题,你可能无法检测到它。您通常不能编写有用的测试,因此您必须依靠代码检查结合深入的并发知识来发现错误。即使是有效的程序也只能在其设计参数下工作。当超出这些设计参数时,大多数并发程序会以某种方式失败。)

  4. 你仍然必须理解它

Java中的并发

为了获得正确的并发性,语言功能必须从头开始设计并考虑并发性。Java将不再是为并发而设计的语言,而只是一种允许它的语言。

Java 8中的改进:并行流和CompletableFutures

并行流

它很强

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package a24_Concurrent_programming.trival;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.stream.Collectors;

import static java.util.stream.IntStream.iterate;
import static java.util.stream.LongStream.rangeClosed;

public class ParallelPrime {
static final int COUNT = 100_000;

public static boolean isPrime(long n) {
return rangeClosed(2, (long) Math.sqrt(n)).noneMatch(i -> n % i == 0);
}

public static void main(String[] args)
throws IOException {
Instant before = Instant.now();
List<String> primes =
iterate(2, i -> i + 1)
.parallel()
.filter(ParallelPrime::isPrime)
.limit(COUNT)
.mapToObj(Long::toString)
.collect(Collectors.toList());
Instant after = Instant.now();
System.out.println(Duration.between(before, after));
Files.write(Paths.get("primes.txt"), primes, StandardOpenOption.CREATE);
}

}

// PT0.681681S
// 去掉并行流 PT1.039754S

它有时候也不行

使用iterate()生成序列,由于每次生成数字都必须调用lambda,导致减速严重。

无状态生成器的行为类似于数组

迭代生成器的行为类似于链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package a24_Concurrent_programming.trival;

import java.time.Duration;
import java.time.Instant;
import java.util.stream.*;
import java.util.function.*;

public class Summing {
static void timeTest(String id, long checkValue, LongSupplier operation) {
System.out.print(id + ": ");
Instant before = Instant.now();
long result = operation.getAsLong();
Instant after = Instant.now();
if (result == checkValue)
System.out.println(Duration.between(before, after));
else
System.out.format("result: %d%n checkValue: %d%n", result, checkValue);
}

// public static final int SZ = 100_000_000;// This even works
public static final int SZ = 1_000_000_000;
public static final long CHECK = (long) SZ * ((long) SZ + 1) / 2;

public static void main(String[] args) {
System.out.println(CHECK);
timeTest("Sum Stream", CHECK, () ->
LongStream.rangeClosed(0, SZ).sum());
timeTest("Sum Stream Parallel", CHECK, () ->
LongStream.rangeClosed(0, SZ).parallel().sum());
timeTest("Sum Iterated", CHECK, () ->
LongStream.iterate(0, i -> i + 1)
.limit(SZ + 1).sum());
// Slower & runs out of memory above 1_000_000:
timeTest("Sum Iterated Parallel", CHECK, () ->
LongStream.iterate(0, i -> i + 1)
.parallel()
.limit(SZ+1).sum());
}
}

对于long数组来说

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package a24_Concurrent_programming.trival;

import java.util.Arrays;

public class Summing2 {
static long basicSum(long[] ia) {
long sum = 0;
int size = ia.length;
for (int i = 0; i < size; i++)
sum += ia[i];
return sum;
}

// Approximate largest value of SZ before
// running out of memory on mymachine:
public static final int SZ = 20_000_000;
public static final long CHECK = (long) SZ * ((long) SZ + 1) / 2;

public static void main(String[] args) {
System.out.println(CHECK);
long[] la = new long[SZ + 1];
Arrays.parallelSetAll(la, i -> i);
Summing.timeTest("Array Stream Sum", CHECK, () ->
Arrays.stream(la).sum());
Summing.timeTest("Parallel", CHECK, () ->
Arrays.stream(la).parallel().sum());
Summing.timeTest("Basic Sum", CHECK, () ->
basicSum(la));// Destructive summation:
Summing.timeTest("parallelPrefix", CHECK, () -> {
Arrays.parallelPrefix(la, Long::sum);
return la[la.length - 1];
});
}
}

Long数组?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package a24_Concurrent_programming.trival;

import java.util.*;
public class Summing3 {
static long basicSum(Long[] ia) {
long sum = 0;
int size = ia.length;
for(int i = 0; i < size; i++)
sum += ia[i];
return sum;
}
// Approximate largest value of SZ before
// running out of memory on my machine:
public static final int SZ = 10_000_000;
public static final long CHECK = (long)SZ * ((long)SZ + 1)/2;
public static void main(String[] args) {
System.out.println(CHECK);
Long[] aL = new Long[SZ+1];
Arrays.parallelSetAll(aL, i -> (long)i);
Summing.timeTest("Long Array Stream Reduce", CHECK, () ->
Arrays.stream(aL).reduce(0L, Long::sum));
Summing.timeTest("Long Basic Sum", CHECK, () ->
basicSum(aL));
// Destructive summation:
Summing.timeTest("Long parallelPrefix",CHECK, ()-> {
Arrays.parallelPrefix(aL, Long::sum);
return aL[aL.length - 1];
});
}
}

这种时间增加的一个重要原因是处理器内存缓存。使用Summing2.java中的原始long,数组la是连续的内存。处理器可以更容易地预测该阵列的使用,并使缓存充满下一个需要的阵列元素。访问缓存比访问主内存快得多。似乎 Long parallelPrefix 计算受到影响,因为它为每个计算读取两个数组元素,并将结果写回到数组中,并且每个都为Long生成一个超出缓存的引用。

aL是一个Long数组,它不是一个连续的数据数组,而是一个连续的Long对象引用数组。尽管该数组可能会在缓存中出现,但指向的对象几乎总是超出缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
public class ParallelStreamPuzzle {
static class IntGenerator
implements Supplier<Integer> {
private int current = 0;
public Integer get() {
return current++;
}
}
public static void main(String[] args) {
List<Integer> x = Stream.generate(newIntGenerator())
.limit(10)
.parallel() // [1]
.collect(Collectors.toList());
System.out.println(x);
}
}

如果[1]注释运行它,它会产生预期的:
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
每次。但是包含了parallel(),它看起来像一个随机数生成器,带有输出(从一次运行到下一次运行不同),如:
[0, 3, 6, 8, 11, 14, 17, 20, 23, 26]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package a24_Concurrent_programming.trival;

import java.util.*;
import java.util.function.*;
import java.util.stream.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.nio.file.*;
public class ParallelStreamPuzzle2 {
public static final Deque<String> trace = new ConcurrentLinkedDeque<>();
static class IntGenerator implements Supplier<Integer> {
private AtomicInteger current =
new AtomicInteger();
public Integer get() {
trace.add(current.get() + ": " +Thread.currentThread().getName());
return current.getAndIncrement();
}
}
public static void main(String[] args) throws Exception {
List<Integer> x = Stream.generate(new IntGenerator())
.limit(50)
.parallel()
.collect(Collectors.toList());
System.out.println(x);
System.out.println(trace);
}
}

基本上,当你将parallel()与limit()结合使用时,输出不受控制

怎么做?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import java.util.*;
import java.util.stream.*;
public class ParallelStreamPuzzle3 {
public static void main(String[] args) {
List<Integer> x = IntStream.range(0, 30)
.peek(e -> System.out.println(e + ": " + Thread.currentThread()
.getName()))
.limit(10)
.parallel()
.boxed()
.collect(Collectors.toList());
System.out.println(x);
}
}

创建和运行任务

基本工具

  • Tasks and Executors

在Java的早期版本中,您通过直接创建自己的Thread对象来使用线程,甚至将它们子类化以创建您自己的特定“任务线程”对象。你手动调用了构造函数并自己启动了线程。

在Java 5中,添加了类来为您处理线程池。您可以将任务创建为单独的类型,然后将其交给ExecutorService以运行该任务,而不是为每种不同类型的任务创建新的Thread子类型。ExecutorService为您管理线程,并且在运行任务后重新循环线程而不是丢弃线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package a24_Concurrent_programming.b1_create_and_run_tast;

public class NapTask implements Runnable{
final int id;

public NapTask(int id) {
this.id = id;
}

@Override
public void run() {
new Nap(0.1);
System.out.println(this + " " + Thread.currentThread().getName());
}

@Override
public String toString() {
return "a24_Concurrent_programming.b1_create_and_run_tast.NapTask{" + id + '}';
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package a24_Concurrent_programming.b1_create_and_run_tast;

import java.util.concurrent.TimeUnit;

public class Nap {
public static void main(String[] args) {
Nap nap = new Nap(1);
}

public Nap(double t) {
try {
TimeUnit.MILLISECONDS.sleep((long) (1000 * t));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public Nap(double t, String msg) {
this(t);
System.out.println(msg);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package a24_Concurrent_programming.b1_create_and_run_tast;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;

public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService exec = Executors.newCachedThreadPool();
IntStream.range(0,10).mapToObj(NapTask::new).forEach(exec::execute);
System.out.println("All tasks submitted");
exec.shutdown(); // shutdown告诉ExecutorService完成已经提交的任务,但不接受任何新任务。
// 如果只调用shutdown程序也将完成所有任务
// exec.shutdownNow() 出了不接受新任务,也会尝试终端当前线程
while (!exec.isTerminated()) { // 任务仍然在运行,因此我们必须等到它们在退出main()之前完成。这是通过检查exec.isTerminated()来实现的,这在所有任务完成后变为true。
System.out.println(Thread.currentThread().getName() + " awaiting termination"); //main()中线程的名称是main,并且只有一个其他线程pool-1-thread-1。此外,交错输出显示两个线程确实同时运行。
new Nap(0.1);
}
}
}

没有SingleThreadExecutor类。newSingleThreadExecutor()Executors中的工厂,它创建特定类型的Executor

CachedThreadPool的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package a24_Concurrent_programming.b3_cached_thread_pool;

public class InterferingTask implements Runnable {
final int id;
private static Integer val = 0;
public InterferingTask(int id) {
this.id = id;
}
@Override
public void run() {
for(int i = 0; i < 100; i++)
val++;
System.out.println(id + " "+
Thread.currentThread().getName() + " " + val);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package a24_Concurrent_programming.b3_cached_thread_pool;

import java.util.concurrent.*;
import java.util.stream.*;
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService exec
=Executors.newCachedThreadPool();
IntStream.range(0, 100)
.mapToObj(InterferingTask::new)
.forEach(exec::execute);
exec.shutdown();
}
}

所有的任务都试图写入val的单个实例,并且他们正在踩着彼此的脚趾。我们说这样的类不是线程安全的。

解决问题

避免竞争条件的最好方法是避免可变的共享状态。我们可以称之为自私的孩子原则:什么都不分享。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package a24_Concurrent_programming.b3_cached_thread_pool2;

import java.util.concurrent.*;

public class CountingTask implements Callable<Integer> {
final int id;

public CountingTask(int id) {
this.id = id;
}

@Override
public Integer call() {
Integer val = 0;
for (int i = 0; i < 100; i++)
val++;
System.out.println(id + " " +
Thread.currentThread().getName() + " " + val);
return val;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package a24_Concurrent_programming.b3_cached_thread_pool2;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.*;

public class CachedThreadPool {
public static Integer extractResult(Future<Integer> f) {
try {
return f.get();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static void main(String[] args) throws InterruptedException {
ExecutorService exec =
Executors.newCachedThreadPool();
List<CountingTask> tasks =
IntStream.range(0, 10)
.mapToObj(CountingTask::new)
.collect(Collectors.toList());
List<Future<Integer>> futures =
exec.invokeAll(tasks);
Integer sum = futures.stream()
.map(CachedThreadPool::extractResult)
.reduce(0, Integer::sum);
System.out.println("sum = " + sum);
exec.shutdown();
}
}

只有在所有任务完成后,invokeAll()才会返回一个Future列表,每个任务一个FutureFuture是Java 5中引入的机制,允许您提交任务而无需等待它完成。

Future似乎是多余的,因为**invokeAll()**甚至在所有任务完成之前都不会返回。但是,这里的Future并不用于延迟结果,而是用于捕获任何可能发生的异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package a24_Concurrent_programming.trival;

import a24_Concurrent_programming.b3_cached_thread_pool2.CountingTask;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Futures {
public static void main(String[] args)throws InterruptedException, ExecutionException {
ExecutorService exec
=Executors.newSingleThreadExecutor();
Future<Integer> f =
exec.submit(new CountingTask(99));
System.out.println(f.get());
exec.shutdown();
}
}

当你的任务尚未完成的Future上调用**get()**时,调用会阻塞(等待)直到结果可用。

当你调用get()时,Future会阻塞,所以它只能解决等待任务完成的问题。最终,Futures被认为是一种无效的解决方案,现在不鼓励,支持Java 8的CompletableFuture

并行流在这种情况下是一种相对好的解决办法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package a24_Concurrent_programming.trival;
import a24_Concurrent_programming.b3_cached_thread_pool2.CountingTask;

import java.util.stream.*;
public class CountingStream {
public static void main(String[] args) {
System.out.println(
IntStream.range(0, 10)
.parallel()
.mapToObj(CountingTask::new)
.map(ct -> ct.call())
.reduce(0, Integer::sum));
}
}

Lambda和方法引用作为任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package a24_Concurrent_programming.b4_lambda_and_method_ref;

import java.util.concurrent.*;
class NotRunnable {
public void go() {
System.out.println("NotRunnable");
}
}
class NotCallable {
public Integer get() {
System.out.println("NotCallable");
return 1;
}
}
public class LambdasAndMethodReferences {
public static void main(String[] args)throws InterruptedException {
ExecutorService exec =
Executors.newCachedThreadPool();
exec.submit(() -> System.out.println("Lambda1"));
exec.submit(new NotRunnable()::go);
exec.submit(() -> {
System.out.println("Lambda2");
return 1;
});
exec.submit(new NotCallable()::get);
exec.shutdown();
}
}

终止耗时任务

最初的Java设计提供了中断运行任务的机制(为了向后兼容,仍然存在);中断机制包括阻塞问题。中断任务既乱又复杂,因为您必须了解可能发生中断的所有可能状态,以及可能导致的数据丢失。使用中断被视为反对模式,但我们仍然被迫接受。

任务终止的最佳方法是设置任务周期性检查的标志。然后任务可以通过自己的shutdown进程并正常终止。不是在任务中随机关闭线程,而是要求任务在到达了一个较好时自行终止。这总是产生比中断更好的结果,以及更容易理解的更合理的代码。

以这种方式终止任务听起来很简单:设置任务可以看到的boolean flag。编写任务,以便定期检查标志并执行正常终止。这实际上就是你所做的,但是有一个复杂的问题:我们的旧克星,共同的可变状态。如果该标志可以被另一个任务操纵,则存在碰撞可能性。

在研究Java文献时,你会发现很多解决这个问题的方法,经常使用volatile关键字。我们将使用更简单的技术并避免所有易变的参数,这些都在附录:低级并发中有所涉及。

Java 5引入了Atomic类,它提供了一组可以使用的类型,而不必担心并发问题。我们将添加AtomicBoolean标志,告诉任务清理自己并退出。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import java.util.concurrent.atomic.AtomicBoolean;import onjava.Nap;
public class QuittableTask implements Runnable {
final int id;
public QuittableTask(int id) {
this.id = id;
}
private AtomicBoolean running =
new AtomicBoolean(true);
public void quit() {
running.set(false);
}
@Override
public void run() {
while(running.get()) // [1]
new Nap(0.1);
System.out.print(id + " "); // [2]
}
}

CompletableFuture

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package a24_Concurrent_programming.b5_completable_future;

import java.util.*;
import java.util.stream.*;
import java.util.concurrent.*;

import a24_Concurrent_programming.b1_create_and_run_tast.Nap;
import a24_Concurrent_programming.b2_completable_future.QuittableTask;
public class QuittingCompletable {
public static void main(String[] args) {
List<QuittableTask> tasks =
IntStream.range(1, 1000)
.mapToObj(QuittableTask::new)
.collect(Collectors.toList());
List<CompletableFuture<Void>> cfutures =
tasks.stream()
.map(CompletableFuture::runAsync)
.collect(Collectors.toList());
new Nap(1);
tasks.forEach(QuittableTask::quit);
cfutures.forEach(CompletableFuture::join);
}
}

在创建cfutures期间,每个任务都交给CompletableFuture::runAsync。这执行VerifyTask.run()并返回CompletableFuture 。因为run()不返回任何内容,所以在这种情况下我只使用CompletableFuture调用**join()**来等待它完成。

基本用法

这是一个带有静态方法**work()**的类,它对该类的对象执行某些工作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// concurrent/Machina.java
import onjava.Nap;
public class Machina {
public enum State {
START, ONE, TWO, THREE, END;
State step() {
if(equals(END))
return END;
return values()[ordinal() + 1];
}
}
private State state = State.START;
private final int id;
public Machina(int id) {
this.id = id;
}
public static Machina work(Machina m) {
if(!m.state.equals(State.END)){
new Nap(0.1);
m.state = m.state.step();
}
System.out.println(m);return m;
}
@Override
public StringtoString() {
return"Machina" + id + ": " + (state.equals(State.END)? "complete" : state);
}
}

这是一个有限状态机,一个微不足道的机器,因为它没有分支……它只是从头到尾遍历一条路径。**work()**方法将机器从一个状态移动到下一个状态,并且需要100毫秒才能完成“工作”。

我们可以用CompletableFuture做的一件事是使用**completedFuture()**将它包装在感兴趣的对象中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// concurrent/CompletedMachina.java
import java.util.concurrent.*;
public class CompletedMachina {
public static void main(String[] args) {
CompletableFuture<Machina> cf =
CompletableFuture.completedFuture(
new Machina(0));
try {
Machina m = cf.get(); // Doesn't block
} catch(InterruptedException |
ExecutionException e) {
throw new RuntimeException(e);
}
}
}

completedFuture()创建一个“已经完成”的CompletableFuture。对这样一个未来做的唯一有用的事情是get()里面的对象,所以这看起来似乎没有用。注意CompletableFuture被输入到它包含的对象。这个很重要。

通常,get()在等待结果时阻塞调用线程。此块可以通过InterruptedExceptionExecutionException中断。在这种情况下,阻止永远不会发生,因为CompletableFutureis已经完成,所以答案立即可用。

当我们将Machina包装在CompletableFuture中时,我们发现我们可以在CompletableFuture上添加操作来处理所包含的对象,事情变得更加有趣:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// concurrent/CompletableApply.java
import java.util.concurrent.*;
public class CompletableApply {
public static void main(String[] args) {
CompletableFuture<Machina> cf =
CompletableFuture.completedFuture(
new Machina(0));
CompletableFuture<Machina> cf2 =
cf.thenApply(Machina::work);
CompletableFuture<Machina> cf3 =
cf2.thenApply(Machina::work);
CompletableFuture<Machina> cf4 =
cf3.thenApply(Machina::work);
CompletableFuture<Machina> cf5 =
cf4.thenApply(Machina::work);
}
}
/* Output:
Machina0: ONE
Machina0: TWO
Machina0: THREE
Machina0: complete
*/

thenApply()应用一个接受输入并产生输出的函数。在这种情况下,work()函数产生与它相同的类型,因此每个得到的CompletableFuture仍然被输入为Machina,但是(类似于Streams中的map()Function也可以返回不同的类型,这将反映在返回类型

您可以在此处看到有关CompletableFutures的重要信息:它们会在您执行操作时自动解包并重新包装它们所携带的对象。这样你就不会陷入麻烦的细节,这使得编写和理解代码变得更加简单。

我们可以消除中间变量并将操作链接在一起,就像我们使用Streams一样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// concurrent/CompletableApplyChained.javaimport java.util.concurrent.*;
import onjava.Timer;
public class CompletableApplyChained {
public static void main(String[] args) {
Timer timer = new Timer();
CompletableFuture<Machina> cf =
CompletableFuture.completedFuture(
new Machina(0))
.thenApply(Machina::work)
.thenApply(Machina::work)
.thenApply(Machina::work)
.thenApply(Machina::work);
System.out.println(timer.duration());
}
}
/* Output:
Machina0: ONE
Machina0: TWO
Machina0: THREE
Machina0: complete
514
*/

在这里,我们还添加了一个Timer,它向我们展示每一步增加100毫秒,还有一些额外的开销。
CompletableFutures的一个重要好处是它们鼓励使用私有子类原则(不分享任何东西)。默认情况下,使用**thenApply()**来应用一个不与任何人通信的函数 - 它只需要一个参数并返回一个结果。这是函数式编程的基础,并且它在并发性方面非常有效[^5]。并行流和ComplempleFutures旨在支持这些原则。只要您不决定共享数据(共享非常容易,甚至意外)您可以编写相对安全的并发程序。

回调thenApply()开始一个操作,在这种情况下,在完成所有任务之前,不会完成e CompletableFuture的创建。虽然这有时很有用,但是启动所有任务通常更有价值,这样就可以运行时继续前进并执行其他操作。我们通过在操作结束时添加Async来实现此目的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// concurrent/CompletableApplyAsync.java
import java.util.concurrent.*;
import onjava.*;
public class CompletableApplyAsync {
public static void main(String[] args) {
Timer timer = new Timer();
CompletableFuture<Machina> cf =
CompletableFuture.completedFuture(
new Machina(0))
.thenApplyAsync(Machina::work)
.thenApplyAsync(Machina::work)
.thenApplyAsync(Machina::work)
.thenApplyAsync(Machina::work);
System.out.println(timer.duration());
System.out.println(cf.join());
System.out.println(timer.duration())
}
}
/* Output:
116
Machina0: ONE
Machina0: TWO
Machina0:THREE
Machina0: complete
Machina0: complete
552
*/

同步调用(我们通常使用得那种)意味着“当你完成工作时,返回”,而异步调用以意味着“立刻返回但是继续后台工作。”正如你所看到的,cf的创建现在发生得跟快。每次调用 thenApplyAsync() 都会立刻返回,因此可以进行下一次调用,整个链接序列的完成速度比以前快得快。

事实上,如果没有回调cf.join() t方法,程序会在完成其工作之前退出(尝试取出该行)对**join()**阻止了main()进程的进行,直到cf操作完成,我们可以看到大部分时间的确在哪里度过。

这种“立即返回”的异步能力需要CompletableFuture库进行一些秘密工作。特别是,它必须将您需要的操作链存储为一组回调。当第一个后台操作完成并返回时,第二个后台操作必须获取生成的Machina并开始工作,当完成后,下一个操作将接管,等等。但是没有我们普通的函数调用序列,通过程序调用栈控制,这个顺序会丢失,所以它使用回调 - 一个函数地址表来存储。

幸运的是,您需要了解有关回调的所有信息。程序员将你手工造成的混乱称为“回调地狱”。通过异步调用,CompletableFuture为您管理所有回调。除非你知道关于你的系统有什么特定的改变,否则你可能想要使用异步调用。

  • 其他操作
    当您查看CompletableFuture的Javadoc时,您会看到它有很多方法,但这个方法的大部分来自不同操作的变体。例如,有thenApply(),thenApplyAsync()和thenApplyAsync()的第二种形式,它接受运行任务的Executor(在本书中我们忽略了Executor选项)。

这是一个显示所有“基本”操作的示例,它们不涉及组合两个CompletableFutures或异常(我们将在稍后查看)。首先,我们将重复使用两个实用程序以提供简洁和方便:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// concurrent/CompletableUtilities.java
package onjava; import java.util.concurrent.*;
public class CompletableUtilities {
// Get and show value stored in a CF:
public static void showr(CompletableFuture<?> c) {
try {
System.out.println(c.get());
} catch(InterruptedException
| ExecutionException e) {
throw new RuntimeException(e);
}
}
// For CF operations that have no value:
public static void voidr(CompletableFuture<Void> c) {
try {
c.get(); // Returns void
} catch(InterruptedException
| ExecutionException e) {
throw new RuntimeException(e);
}
}
}

showr()在CompletableFuture 上调用get()并显示结果,捕获两个可能的异常。voidr()是CompletableFuture 的showr()版本,即CompletableFutures,仅在任务完成或失败时显示。

为简单起见,以下CompletableFutures只包装整数。cfi()是一个方便的方法,它在完成的CompletableFuture 中包装一个int:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// concurrent/CompletableOperations.java
import java.util.concurrent.*;
import static onjava.CompletableUtilities.*;
public class CompletableOperations {
static CompletableFuture<Integer> cfi(int i) {
return CompletableFuture.completedFuture( Integer.valueOf(i));
}
public static void main(String[] args) {
showr(cfi(1)); // Basic test
voidr(cfi(2).runAsync(() ->
System.out.println("runAsync")));
voidr(cfi(3).thenRunAsync(() ->
System.out.println("thenRunAsync")));
voidr(CompletableFuture.runAsync(() ->
System.out.println("runAsync is static")));
showr(CompletableFuture.supplyAsync(() -> 99));
voidr(cfi(4).thenAcceptAsync(i ->
System.out.println("thenAcceptAsync: " + i)));
showr(cfi(5).thenApplyAsync(i -> i + 42));
showr(cfi(6).thenComposeAsync(i -> cfi(i + 99)));
CompletableFuture<Integer> c = cfi(7);
c.obtrudeValue(111);
showr(c);
showr(cfi(8).toCompletableFuture());
c = new CompletableFuture<>();
c.complete(9);
showr(c);
c = new CompletableFuture<>();
c.cancel(true);
System.out.println("cancelled: " + c.isCancelled());
System.out.println("completed exceptionally: " +
c.isCompletedExceptionally());
System.out.println("done: " + c.isDone());
System.out.println(c);
c = new CompletableFuture<>();
System.out.println(c.getNow(777));
c = new CompletableFuture<>();
c.thenApplyAsync(i -> i + 42)
.thenApplyAsync(i -> i * 12);
System.out.println("dependents: " + c.getNumberOfDependents());
c.thenApplyAsync(i -> i / 2);
System.out.println("dependents: " + c.getNumberOfDependents());
}
}
/* Output:
1
runAsync
thenRunAsync
runAsync is static
99
thenAcceptAsync: 4
47
105
111
8
9
cancelled: true
completed exceptionally: true
done: true
java.util.concurrent.CompletableFuture@6d311334[Complet ed exceptionally]
777
dependents: 1
dependents: 2
*/

main()包含一系列可由其int值引用的测试。cfi(1)演示了showr()正常工作。cfi(2)是调用runAsync()的示例。由于Runnable不产生返回值,因此结果是CompletableFuture ,因此使用voidr()。