Java8新特性之Stream流
1.什么是Stream?
Java 8 API添加了一个新的抽象称为流Stream,可以让你以一种声明的方式处理数据。通过声明性方式,能够对集合中的每个元素进行一系列并行或串行的流水线操作。例:
1 2 3 4 5 6 7
| public class StreamDemo1 { public static void main(String[] args) { int[] numbers = {1,2,3}; int num2 = IntStream.of(numbers).sum(); System.out.println(num2); } }
|
2.创建流的几种方式
2.1 使用集合
1 2 3
| ArrayList arrayList = new ArrayList(); arrayList.stream(); arrayList.parallelStream();
|
2.2 使用数组
1
| Arrays.stream(new int[]{1,2,3});
|
2.3 数字Stream
1 2
| IntStream.of(1,2,3); IntStream.rangeClosed(1,10);
|
2.4 使用Stream.generate()
1
| Stream.generate(()-> "stream").limit(5);
|
3. 常用方法
流模型的操作很丰富,这里介绍一些常用的API。这些方法可以被分成两种:
- 延迟方法:返回值任然是Stream接口自身类型的方法,因此支持链式调用(除了终结方法外,其他都是方法均为延迟方法)
- 终结方法:返回值类型不再是Stream接口自身类型的方法。
3.1 延迟方法
1. filter
可以通过filter
方法将一个流转换成另一个子集流。
1
| Stream.of(1,2,3).filter(i -> i>2).forEach(System.out::println);
|
2.peek
用于debug,foreach是最终操作
1
| Stream.of("张三","李四","王麻子").peek(System.out::println).forEach(System.out::println);
|
3.map
将流中的元素映射到另一个流中
1
| Stream.of("张三","李四","王麻子").map(String::length).forEach(System.out::println);
|
4.limit
对流进行截取,只取用前n个
1
| Stream.of("张三","李四","王麻子").limit(2).forEach(System.out::println);
|
5.skip
跳过前几个
1
| Stream.of("张三","李四","王麻子").skip(2).forEach(System.out::println);
|
3.2 终止方法
1.forEach
迭代
1
| Stream.of(1,2,3).forEach(System.out::println);
|
2.count
计算个数
1
| System.out.println(Stream.of("张三","李四","王麻子").count());
|
3.collect
收集到list
1 2
| List<String> list = Stream.of(str.split(" ")).collect(Collectors.toList()); System.out.println("收集到list:"+list);
|
4.reduce
从Stream中生成一个值
1 2
| Optional<String> newStr = Stream.of(str.split("")).reduce((str1, str2) -> str1 + "-" + str2); System.out.println(newStr.orElse(""));
|
1 2
| String newStr2 = Stream.of(str.split("")).reduce("", (str1, str2) -> str1 + "-" + str2); System.out.println(newStr2);
|
1 2
| Optional<Integer> length = Stream.of(str.split(" ")).map(s -> s.length()).reduce((len1, len2) -> len1 + len2); System.out.println("length:" + length.get());
|
5.max
求最大值
1 2
| Optional<String> maxValue = Stream.of(str.split(" ")).max((str1, str2) -> str1.length() - str2.length()); System.out.println("maxValue:" + maxValue.get());
|
6.findFirst
找第一个元素
1 2
| Optional<String> firstValue = Stream.of(str.split(" ")).findFirst(); System.out.println("firstValue:" + firstValue.get());
|
4. 惰性求值
惰性求值:终结没有调用的情况下,延迟方法不会执行。例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import java.util.stream.IntStream;
public class StreamDemo1 { public static void main(String[] args) { int[] numbers = {1, 2, 3}; int num2 = IntStream.of(numbers).map(StreamDemo1::doubleNum).sum(); System.out.println(num2); IntStream.of(numbers).map(StreamDemo1::doubleNum); }
private static int doubleNum(int i) { System.out.println("执行了乘以2"); return i * 2; } }
|
1 2 3 4 5
| > Task :StreamDemo1.main() 执行了乘以2 执行了乘以2 执行了乘以2 12
|
中间操作就是:返回stream的操作,如:map
终止操作就是:sum()
从输出结果可以看出doubleNum执行了3次
5.并行流
5.1 创建一个并行流
1 2 3 4 5 6 7 8
| public class ParallelStream { public static void main(String[] args) { IntStream.range(1,100).parallel().peek(ParallelStream::debugger).count(); } private static void debugger(int i) { System.out.println("debugger"+i); } }
|
输出:
1 2 3 4 5 6 7 8 9 10 11 12
| > Task :ParallelStream.main() debugger90 debugger81 debugger82 debugger83 debugger84 debugger85 debugger86 debugger65 debugger66 debugger67 ...
|
- 多次调用parallel / sequential,以最后一个为准,如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class ParallelStream { public static void main(String[] args) { IntStream.range(1, 10) .parallel().peek(ParallelStream::debugger) .sequential().peek(ParallelStream::debugger2) .count(); }
private static void debugger2(int i) { System.err.println("debugger2: " + i); }
private static void debugger(int i) { System.out.println("debugger1: " + i); } }
|
5.2 并行流使用自己定义的线程池
原因:避免使用默认线程池,防止任务被阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| import java.util.concurrent.ForkJoinPool; import java.util.stream.IntStream;
public class ParallelStream { public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(20); forkJoinPool.submit(() -> IntStream.range(1, 10) .parallel().peek(ParallelStream::debugger) .count()); forkJoinPool.shutdown();
synchronized (forkJoinPool) { try { forkJoinPool.wait(); } catch (InterruptedException e) { e.printStackTrace(); } }
}
private static void debugger(int i) { System.out.println(Thread.currentThread().getName() + "debugger1: " + i); } }
|
6. Stream 流的运行机制
所有的操作都是链式调用,每个操作只会对每个元素操作一次;(是通过维护一个链表实现的。)具体实现:
- 每个中间操作都会返回一个新的流,每个流里面都会有一个SourceStage属性,所有流的SourceStage属性都指向同一个地方head【就是原始流的头部】;
- 如果一个中间操作之后还有中间操作,那么这个中间操作对应的流中nextStage属性就会执行下一个中间操作对应的流,否则就是null
注:parallel / sequential也是中间操作,但是他们呢不创建流,只是修改head 里的并行标志:parallel