Introduction to Stream in Java 8

catalogue

What is Stream

How to create a Stream

Operate on Stream

Advanced

Sequential flow

Inert intermediate operation

Execution sequence of operations

summary

What is Stream

To answer this question, let's take a look at how the relevant JavaDoc describes:

A sequence of elements supporting sequential and parallel aggregate operations

"Element sequence supporting sequential and parallel aggregation operations", at first glance, feels like a data structure similar to a collection, which stores a set of data and supports some specific operations, but it is not. Let's start with a simple example:

// Create a stream
List<String> list = Arrays.asList("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
list.stream().filter(item -> item.length() < 3) // Filter out those with length greater than or equal to 3
        .distinct() // Remove duplicate elements
        .map(String::toLowerCase) // Convert each element
        .sorted() // sort
        .forEach(item -> System.out.print(item + " ")); // Traverse each element

// Output result: a1 a2 b1 b2 

It can be seen from this example that Stream is an enhancement of the function of collection objects. It is related to algorithms and calculations, and focuses on various convenient and efficient operations on collection objects. With the help of Lambda expression, the programming efficiency and code readability are greatly improved.

A Stream is like an Iterator, unidirectional and non reciprocating. Using Stream to operate the elements in the collection feels like an assembly line for filling beer. One empty wine bottle (equivalent to the elements in the collection) is transported to each operation unit, filling wine / pressing bottle caps / labeling / removing problematic bottles (equivalent to the operation of elements), and finally packing boxes are stacked in the warehouse.

 

How to create a Stream

There are three common ways to create streams.

1. Create a Stream using the static method of Stream

        // Mode 1:
        Stream<String> stream1 = Stream.of("A1", "B1", "A2", "B2", "A10", "B10");
        stream1.forEach(item -> System.out.print(item + " "));
        // Output result: A1 B1 A2 B2 A10 B10

        // Mode 2:
        Stream<Integer> stream2 = Stream.iterate(1, (x) -> x + 1).limit(5);
        stream2.forEach(item -> System.out.print(item + " "));
        // Output result: 1 2 3 4 5

        // Mode 3:
        Stream<Double> stream3 = Stream.generate(Math::random).limit(3);
        stream3.forEach(item -> System.out.print(item + " "));
        // Output result: 0.8974676207611573 0.7658795436834018 0.48552366426962845

iterate and generate are more suitable for building massive test data. When using, pay attention to use with limit, otherwise an infinite stream will be created.

2. Use array conversion

String[] arr = new String[]{"A1", "B1", "A2", "B2", "A10", "B10"};
Stream<String> stream1 = Arrays.stream(arr);
stream1.forEach(item -> System.out.print(item + " "));
// Output result: A1 B1 A2 B2 A10 B10

Actually, in stream Arrays. Is also called directly inside the of () method stream().

 3. Created using the stream() method of the Collection interface

List<String> list = new ArrayList<>();
list.add("A1");
list.add("B1");
Stream<String> stream = list.stream();
stream.forEach(item -> System.out.print(item + " "));
// Output result: A1 B1

For common numeric collection operations, the JDK additionally provides IntStream, LongStream and DoubleStream, which are equivalent to Stream < integer >,, Stream < long >, and Stream < double >, but reduces additional boxing and unboxing operations and improves efficiency.

Other creation methods include:

  • java.io.BufferedReader.lines()
  • java.util.stream.IntStream.range()
  • java.nio.file.Files.walk()
  • Random.ints()
  • BitSet.stream()
  • Pattern.splitAsStream(java.lang.CharSequence)
  • JarFile.stream()

 

Operate on Stream

When we use a stream, it usually includes three steps: obtain data source > data transformation > obtain final result. Apply the example in the previous article and disassemble as follows:

The common operations of Stream are shown in the figure below

  • Intermediate operation: the return result of intermediate operation is a new Stream, which means that a Stream can be followed by multiple intermediate operations, such as Stream a(). b(). Drive a train like this. Intermediate operations are lazy, that is, only calling these methods does not really start the traversal of the flow.
  • Termination operation: a flow can only have one termination operation. When this operation is executed, the flow will be used "light" and can no longer be operated. So this must be the last operation of the stream. Only when the Terminal operation is executed can the flow traversal really begin.
  • Stateless: the operation of an element does not depend on other elements
  • Stateful: the operation of an element depends on other operations
  • Non short circuit operation: each element in the flow is processed to
  • Short circuit operation: only a part of the elements need to be processed, and the execution will be terminated.

Let's demonstrate these common operations. If the user-defined Person class is used in the demonstration, the definition is as follows:

@Data
class Person {
    private String name;
    private int age;

    public Person(String name, int age) {
        this.name = name;
        this.age = age;
    }
}

filter

Filter can be used to filter the elements in the Stream. It performs a test operation on each element, retains the element with the return value of true, and forms a new Stream.

// filter
Stream<String> stream = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
stream.filter(item -> item.length() < 3).forEach(item -> System.out.print(item + " "));
// Output result: A1 B1 A2 B2

map

Each element is transformed to form a new stream.

// map
Stream<String> stream = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
stream.map(String::toLowerCase).forEach(item -> System.out.print(item + " "));
// Equivalent to the following code:
//stream.map(item -> item.toLowerCase()).forEach(item -> System.out.print(item + " "));
        
// Output result: a1 a1 b1 a2 b2 a10 b10 b10

distinct

Remove duplicate elements. Based on the equals() method of the element.

// distinct
Stream<String> stream = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
stream.distinct().forEach(item -> System.out.print(item + " "));

// Output result: A1 B1 A2 B2 A10 B10

limit

Intercept the first n elements of the Stream. When n is greater than the number of elements in the Stream, a new Stream composed of all elements is returned.

// limit
Stream<String> stream = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
stream.limit(3).forEach(item -> System.out.print(item + " "));

// Output result: A1 B1

skip

skip the first n elements and form a new Stream with the remaining elements. N can be greater than the number of elements in the original Stream, and an empty Stream will be obtained. skip can be combined with limit to realize paging.

// skip
Stream<String> stream = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
stream.skip(2).forEach(item -> System.out.print(item + " "));
// Output result: B1 A2 B2 A10 B10 B10

Stream<String> stream2 = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
stream2.skip(8).forEach(item -> System.out.print(item + " "));
// Empty stream, no output result

sorted

Sort the elements in the stream. The sorted() method requires that the elements in the stream implement the Comparable interface, otherwise ClassCastException will be thrown. Sorted (comparator <? Super T > comparator) can specify sorting rules.

 // sorted
Stream<String> stream = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
stream.sorted().forEach(item -> System.out.print(item + " "));
// Output result: A1 A10 A2 B1 B10 B10 B2

Example of sorting custom objects:

List<Person> list = new ArrayList<>();
for (int i = 3; i > 0; i--) {
    list.add(new Person("Name" + i, i * 10));
}
System.out.println(list);
// Output result: [Person(name=Name3, age=30), Person(name=Name2, age=20), Person(name=Name1, age=10)]

list.stream().sorted(Comparator.comparingInt(Person::getAge)).forEach(item -> System.out.println(item));
// Output result:
// Person(name=Name1, age=10)
// Person(name=Name2, age=20)
// Person(name=Name3, age=30)

forEach

Receive a Lambda expression and execute the expression for each element. It is used in the previous examples to execute printing. Note that forEach is a termination operation. Once executed, the flow will be consumed, and an exception will be thrown again. The internal implementation of forEach is still the traditional for loop, but it saves a lot of coding and is very refreshing.

// forEach
Stream<String> stream = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
stream.forEach(item -> System.out.print(item + " "));
// Output result: A1 B1 A2 B2 A10 B10 B10

// An exception will be thrown when calling forEach again, because forEach is the end operation. Once executed, the flow will be consumed.
// java.lang.IllegalStateException: stream has already been operated upon or closed
stream.forEach(item -> System.out.print(item + " "));

peek

Unlike forEach, peek is an intermediate operation. After the operation, a new stream is returned and the operation can continue.

List<Person> list = new ArrayList<>();
for (int i = 3; i > 0; i--) {
    list.add(new Person("Name" + i, i * 10));

}
System.out.println(list);
// Output result: [Person(name=Name3, age=30), Person(name=Name2, age=20), Person(name=Name1, age=10)]

Stream<Person> stream = list.stream();
stream.peek(item -> item.setName(item.getName().toLowerCase()))
        .forEach(item -> item.setAge(item.getAge() + 1));
System.out.println(list);
// Output results: [Person(name=name3, age=31), Person(name=name2, age=21), Person(name=name1, age=11)]

reduce

The main function is to combine the Stream elements. It provides a starting value (seed), and then combines it with the first, second and Nth elements of the previous Stream according to the operation rules (BinaryOperator). In this sense, string splicing, sum, min, max and average of numerical values are special reduce

// reduce
Stream<String> stream = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
String str = stream.reduce("", String::concat);
System.out.println(str);
// Output result: a1b1a2b2a10b10b10

IntStream stream2 = IntStream.of(1, 2, 3, 4, 5);
Integer sum = stream2.reduce(0, (a, b) -> a + b);
System.out.println(sum);
// Output result: 15

Collect

Collect is a very useful final operation, which can convert the elements in the stream into another form. Collect uses Collector as a parameter. Java 8 has built-in various complex collection operations, so it can be used directly for most common operations.

Converting to List/Set/Map is the most common operation:

// Convert to List
Stream<String> stream = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
List<String> list = stream.filter(item -> item.length() > 2).collect(Collectors.toList());
System.out.println(list);
// Output: [A10, B10, B10]

// Convert to Set
Stream<String> stream2 = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
Set<String> set = stream2.collect(Collectors.toSet());
System.out.println(set);
// Output: [A1, B2, A10, A2, B10, B1]

// Convert to Map
Stream<String> stream3 = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
Map<String, Integer> map = stream3.distinct().collect(Collectors.toMap(item -> item, String::length));
System.out.println(map);
// {A10=3, B2=2, A1=2, B10=3, A2=2, B1=2}

 Collectors.groupingBy can be used for grouping. It is also a commonly used function:

// Group by string length
Stream<String> stream = Stream.of("A", "A1", "B", "A2", "B1", "A11", "B11", "B11");
Map<Integer,List<String>> map = stream.collect(Collectors.groupingBy(String::length));
System.out.println(map);
// Output: {1=[A, B], 2=[A1, A2, B1], 3=[A11, B11, B11]}

Collectors can be used to make statistics on data, calculate the maximum value, minimum value, average value, sum, etc. there are single operation methods, or summarizing can be used to return multiple statistics at one time:

// summingInt summation and averaging int average can be used for single item statistics
Stream<String> stream = Stream.of("A", "A1", "A12", "A123", "A1234");
System.out.println(stream.collect(Collectors.summingInt(String::length)));
// Output: 15

// summarizingInt can output more statistical information
Stream<String> stream2 = Stream.of("A", "A1", "A12", "A123", "A1234");
IntSummaryStatistics s = stream2.collect(Collectors.summarizingInt(String::length));
System.out.println(s);
// Output: IntSummaryStatistics{count=5, sum=15, min=1, average=3.000000, max=5}

In the previous example, we used reduce to connect the elements in the string stream into a string, and Collectors can achieve similar functions,

// Splice elements directly
Stream<String> stream = Stream.of("A", "A1", "A12", "A123", "A1234");
String s = stream.collect(Collectors.joining());
System.out.println(s);
// Output: AA1A12A123A1234

// Splice using the specified connection string, prefix, suffix (optional),
Stream<String> stream2 = Stream.of("A", "A1", "A12", "A123", "A1234");
String s2 = stream2.collect(Collectors.joining("_","[","]"));
System.out.println(s2);
// Output: [A_A1_A12_A123_A1234]

match

Stream has three match methods:

  • anyMatch: as long as one element in the Stream matches the incoming predicate, it returns true; otherwise, it returns false
  • allMatch: all elements in the Stream conform to the incoming predicate and return true; otherwise, return false
  • noneMatch: if none of the elements in the Stream matches the incoming predicate, return true; otherwise, return false

match is a short-circuit operation. During execution, once the final result can be determined, it will be returned immediately. For example, whenever anyMatch finds a qualified element, it immediately returns true and will not detect subsequent elements. See the example below for details

// anyMatch
Stream<String> stream = Stream.of("A", "A1", "A12");
boolean b = stream.anyMatch(item -> {
    System.out.println("Current element:" + item);
    return item.length() >= 2;
});
System.out.println("Final result:" + b);
//Current element: A
//Current element: A1
//Final result: true

// allMatch
Stream<String> stream2 = Stream.of("A", "A1", "A12");
boolean b2 = stream2.allMatch(item -> {
    System.out.println("Current element:" + item);
    return item.length() >= 2;
});
System.out.println("Final result:" + b2);
//Current element: A
//Final result: false

// noneMatch
Stream<String> stream3 = Stream.of("A", "A1", "A12");
boolean b3 = stream3.noneMatch(item -> {
    System.out.println("Current element:" + item);
    return item.length() >= 2;
});
System.out.println("Final result:" + b3);
//Current element: A
//Current element: A1
//Final result: false

Advanced

Sequential flow

Streams can be divided into sequential streams and parallel streams. In the previous examples, sequential streams are used, and single thread streams are processed. Parallel Stream uses multithreading to process data, which can greatly improve the processing speed in the case of large amount of data. Behind it is the use of a general concurrency framework ForkJoinPool, which uses the static method ForkJoinPool Commonpool(). For ForkJoinPool, the actual number of threads used depends on the actual number of CPU cores behind the machine.

// My machine has an 8-core CPU
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism());
//Output: 7

You can modify the following JVM parameters:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=5

You can use the parallel stream () method of Collection to directly create a parallel stream, or you can use the parallel() method to flow a sequence into a parallel stream. The following example clearly shows that multiple threads participate in the processing of parallel streams.

List<String> list = Arrays.asList("A1", "A2", "A3", "A4", "A5", "A6", "A7", "A8", "A9");
list.parallelStream()
        .forEach(item -> System.out.println(Thread.currentThread().getName() + " forEach:" + item));
// Output:
//main forEach:A6
//main forEach:A5
//ForkJoinPool.commonPool-worker-1 forEach:A3
//ForkJoinPool.commonPool-worker-3 forEach:A4
//ForkJoinPool.commonPool-worker-2 forEach:A8
//ForkJoinPool.commonPool-worker-1 forEach:A1
//ForkJoinPool.commonPool-worker-4 forEach:A7
//ForkJoinPool.commonPool-worker-3 forEach:A9
//main forEach:A2

Some of the Stream API can only be used in parallel streams, such as forEachOrdered, which functions the same as forEach and can traverse elements. The main reason is that when it works on parallel streams, forEach cannot guarantee the order of traversing elements, while forEachOrdered can.

Stream<String> stream1 = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
Stream<String> stream2 = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");
Stream<String> stream3 = Stream.of("A1", "A1", "B1", "A2", "B2", "A10", "B10", "B10");

// For sequential flow, foreach will traverse the elements in the order of the flow
stream1.forEach(item -> System.out.print(item + " "));
// Output result: A1 B1 A2 B2 A10 B10 B10
        
// Parallel streams can be processed using multithreaded streaming, and the order is not guaranteed when traversing with forEach
stream2.parallel().forEach(item -> System.out.print(item + " "));
// Output result: A10 B2 B10 B10 A2 A1 B1 A1
        
// For parallel streams, the order is guaranteed when using forEachOrdered traversal
stream3.parallel().forEachOrdered(item -> System.out.print(item + " "));
// Output result: A1 B1 A2 B2 A10 B10 B10

Inert intermediate operation

As mentioned earlier, intermediate operations are lazy. Only calling these methods does not really start the flow traversal. The flow traversal will start only when the termination operation is executed.

Stream<String> stream = Stream.of("A1", "A2", "A3");
stream.peek(System.out::println);
// There will be no output, no termination operation is called, and peek will not be executed

Stream<String> stream2 = Stream.of("B1", "B2", "B3");
stream2.peek(System.out::println).count();
//Output:
//B1
//B2
//B3

Execution sequence of operations

For example: stream filter(). map(). According to the intuitive understanding of the code, operations such as forEach () will traverse the elements in the stream twice, execute filter and map respectively, and finally traverse forEach once. Is that so? Let's take an example.

Stream<String> stream = Stream.of("A1", "A2", "B1", "B2");
stream.map(item -> {
    System.out.println("map:" + item);
    return item.toLowerCase();
}).filter(item -> {
    System.out.println("filter:" + item);
    return item.startsWith("a");
}).forEach(item -> {
    System.out.println("forEach:" + item);
});
//The output in the conjecture is to print four lines of map first, then four lines of filter, and finally two lines of foreach

//Actual output:
//map:A1
//filter:a1
//forEach:a1
//map:A2
//filter:a2
//forEach:a2
//map:B1
//filter:b1
//map:B2
//filter:b2

In this example, we can see that the processing order is that each element moves vertically along the operation chain, performs all operations in turn, and then the next element. In this way, the number of actual operations can be reduced in some scenarios. In the above example, a total of 10 operations have been performed. Let's slightly adjust the order of the operation chain and put the filter in front of the map:

Stream<String> stream = Stream.of("A1", "A2", "B1", "B2");
stream.filter(item -> {
    System.out.println("filter:" + item);
    return item.startsWith("A");
}).map(item -> {
    System.out.println("map:" + item);
    return item.toLowerCase();
}).forEach(item -> {
    System.out.println("forEach:" + item);
});

//Output:
//filter:A1
//map:A1
//forEach:a1
//filter:A2
//map:A2
//forEach:a2
//filter:B1
//filter:B2

The total number of operations was reduced to 8.

The limit operation is to get the first n elements, and the skip operation is to skip n elements. They look similar, but they have different effects on the number of operations.

Stream<String> stream = Stream.of("A1", "A2", "A3", "A4");
stream.limit(2)
        .map(item -> {
            System.out.println("map:" + item);
            return item.toLowerCase();
        }).forEach(item -> System.out.println("forEach:" + item));
//Output:
//map:A1
//forEach:a1
//map:A2
//forEach:a2

Stream<String> stream2 = Stream.of("A1", "A2", "A3", "A4");
stream2.map(item -> {
    System.out.println("map:" + item);
    return item.toLowerCase();
}).limit(2)
 .forEach(item -> System.out.println("forEach:" + item));
//Output:
//map:A1
//forEach:a1
//map:A2
//forEach:a2

Whether limit is before or after map(), the output is the same, and the number of operations is determined by the parameter n of limit. Take another look at skip:

Stream<String> stream = Stream.of("A1", "A2", "A3", "A4");
stream.skip(2)
        .map(item -> {
            System.out.println("map:" + item);
            return item.toLowerCase();
        }).forEach(item -> System.out.println("forEach:" + item));
//Output:
//map:A3
//forEach:a3
//map:A4
//forEach:a4

Stream<String> stream2 = Stream.of("A1", "A2", "A3", "A4");
stream2.map(item -> {
    System.out.println("map:" + item);
    return item.toLowerCase();
}).skip(2)
 .forEach(item -> System.out.println("forEach:" + item));
//Output:
//map:A1
//map:A2
//map:A3
//forEach:a3
//map:A4
//forEach:a4

It can be seen that the position of skip has an impact on the final number of operations, but both limit and skip conform to the logic of "elements move vertically along the operation chain". There is a special case for limit and skip. When they encounter sorting operation sorted:

Stream<String> stream = Stream.of("A1", "A2", "A3", "A4");
stream.sorted((a, b) -> {
    System.out.println("Currently comparing:" + a + " and " + b);
    return a.compareTo(b);
}).limit(2)
        .forEach(item -> System.out.println("forEach:" + item));
//Output:
//Currently comparing: A2 and A1
//Currently comparing: A3 and A2
//Currently comparing: A4 and A3
//forEach:A1
//forEach:A2

It can be seen that although there is a limit(2), the sorted operation still performs sorting on all elements.

summary

  • Stream is an enhancement of the function of collection objects, focusing on various convenient and efficient operations on collection objects;
  • Stream operations include intermediate operations and termination operations. Intermediate operations are inert. Only termination operations can trigger intermediate operations;
  • The consumption of a stream is one-time, and a stream can only perform one termination operation;
  • Parallel streams can be used to improve the efficiency of operations
  • Reasonable design of the sequence of operation chain can improve efficiency

Tags: Java Back-end java8

Posted by shergold on Tue, 17 May 2022 11:09:16 +0300