We will start with aggregate operations on streams and then discuss about parallelism in streams with example code.
You can perform intermediate and terminal operations on Streams. Intermediate operations result in a new stream and are lazily evaluated and will start when terminal operation is called.
persons.stream().filter(p -> p.getGender() == Gender.MALE).forEach(System.out::println);
In the snippet above, filter()
doesn't start filtering immediately but create a new stream. It will only start when terminal operation is called and in above case when forEach()
.
There are many intermediate operations that you can perform on Streams. Some of them are filter()
, distinct()
, sorted()
, limit()
, parallel()
, sequential()
, map()
, flatMap()
.
This takes Predicate
functional interface as argument and the output stream of this operation will have only those elements which pass the conditional check of Predicate
. You can learn a nice explanation on Predicates here.
// all the males
List<Person> allMales = persons.stream().filter(p -> p.getGender() == Gender.MALE).collect(Collectors.toList());
System.out.println(allMales);
It is a mapper operation. It expects Function functional interface as argument. Purpose of Function
is to transform from one type to other (The other type could be same).
// first names of all the persons
List<String> firstNames = persons.stream().map(Person::getFirstName).collect(Collectors.toList());
System.out.println(firstNames);
It returns the unique elements and uses equals()
under the hood to remove duplicates.
List<String> uniqueFirstNames = persons.stream().map(Person::getFirstName).distinct().collect(Collectors.toList());
System.out.println(uniqueFirstNames);
Sorts the stream elements. It is stateful operation.
List<Person> sortedByAge = persons.stream().sorted(Comparator.comparingInt(Person::getAge)).collect(Collectors.toList());
System.out.println(sortedByAge);
It will reduce the number of records. It is helpful to end infinite streams in a finite manner.
Intemediate operations can be divided to two parts stateless and stateful. Most of the streams intermediate operations are stateless e.g. map, filter, limit etc. but some of them are stateful e.g. distinct and sorted because they have to maintain the state of previously visited element.
There are many terminal operations such as forEach()
, reduction()
, max()
, min()
, average()
, collect()
, findAny()
, findFirst()
, allMatch()
, noneMatch()
.
This takes Consumer functional interface as parameter and pass on the element for consumption.
persons.stream().forEach(System.out::println);
max(), min(), average() operations
average() returns OptionalDouble whereas max() and min() return OptionalInt.
//average age of all persons
persons.stream().mapToInt(Person::getAge).average().ifPresent(System.out::println);
// max age from all persons
persons.stream().mapToInt(Person::getAge).max().ifPresent(System.out::println);
// min age from all persons
persons.stream().mapToInt(Person::getAge).min().ifPresent(System.out::println);
matches if certain condition satisfies by none, all and/or any elements of stream respectively.
//age of all females in the group is less than 22
persons.stream().filter(p -> p.getGender() == Gender.FEMALE).allMatch(p -> p.getAge() < 22);
//not a single male's age is greater than 30
persons.stream().filter(p -> p.getGender() == Gender.MALE).noneMatch(p -> p.getAge() > 30);
persons.stream().anyMatch(p -> p.getAge() > 45);
Reduction operations are those which provide single value as result. We have seen in previous snippet some of the reduction operation which do this. E.g. max()
, min()
, average()
, sum()
etc. Apart from this, Java 8 provides two more general purpose operations reduce()
and collect()
.
int sumOfFirst10 = IntStream.range(1, 10).reduce(0, Integer::sum);
System.out.println(sumOfFirst10);
It is a mutating reduction. Collectors has many useful collection methods like toList()
, groupingBy()
.
Collection<Person> persons = StreamSamples.getPersons();
List firstNameOfPersons = persons.stream().map(Person::getFirstName).collect(Collectors.toList());
System.out.println(firstNameOfPersons);
Map<Integer, List<Person>> personByAge = persons.stream().collect(Collectors.groupingBy(Person::getAge));
System.out.println(personByAge);
Double averageAge = persons.stream().collect(Collectors.averagingInt(Person::getAge));
System.out.println(averageAge);
Long totalPersons = persons.stream().collect(Collectors.counting());
System.out.println(totalPersons);
IntSummaryStatistics personsAgeSummary = persons.stream().collect(Collectors.summarizingInt(Person::getAge));
System.out.println(personsAgeSummary);
String allPersonsFirstName = persons.stream().collect(Collectors.mapping(Person::getFirstName, Collectors.joining("#")));
System.out.println(allPersonsFirstName);
The result would look like this.
[Gaurav, Gaurav, Sandeep, Rami, Jiya, Rajesh, Rampal, Nisha, Neha, Ramesh, Parul, Sunil, Prekha, Neeraj]
{32=[Person [firstName=Rami, lastName=Aggarwal, gender=FEMALE, age=32, salary=12000]], 35=[Person [firstName=Rampal, lastName=Yadav, gender=MALE, age=35, salary=12000]], 20=[Person [firstName=Prekha, lastName=Verma, gender=FEMALE, age=20, salary=3600]], 21=[Person [firstName=Neha, lastName=Kapoor, gender=FEMALE, age=21, salary=5500]], 22=[Person [firstName=Jiya, lastName=Khan, gender=FEMALE, age=22, salary=4500], Person [firstName=Ramesh, lastName=Chander, gender=MALE, age=22, salary=2500]], 24=[Person [firstName=Sandeep, lastName=Shukla, gender=MALE, age=24, salary=5000]], 25=[Person [firstName=Parul, lastName=Mehta, gender=FEMALE, age=25, salary=8500], Person [firstName=Neeraj, lastName=Shah, gender=MALE, age=25, salary=33000]], 26=[Person [firstName=Nisha, lastName=Sharma, gender=FEMALE, age=26, salary=10000]], 27=[Person [firstName=Sunil, lastName=Kumar, gender=MALE, age=27, salary=6875]], 28=[Person [firstName=Gaurav, lastName=Mazra, gender=MALE, age=28, salary=10000], Person [firstName=Gaurav, lastName=Mazra, gender=MALE, age=28, salary=10000]], 45=[Person [firstName=Rajesh, lastName=Kumar, gender=MALE, age=45, salary=55000]]}
27.142857142857142
14
IntSummaryStatistics{count=14, sum=380, min=20, average=27.142857, max=45}
Gaurav#Gaurav#Sandeep#Rami#Jiya#Rajesh#Rampal#Nisha#Neha#Ramesh#Parul#Sunil#Prekha#Neeraj
When the terminal operation is completed on stream, it is considered consumed and you can't use it again. You will end up with exception if you try to start new operations on already consumed stream.
Stream<String> stream = lines.stream();
stream.reduce((a, b) -> a.length() > b.length() ? a : b).ifPresent(System.out::println);
// below line will throw the exception
stream.forEach(System.out::println);
The exception would look like this.
Exception in thread "main" java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.sourceStageSpliterator(AbstractPipeline.java:279)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
at com.gauravbytes.java8.stream.StreamExceptionExample.main(StreamExceptionExample.java:18)
Streams provide a convenient way to execute operations in parallel. It uses ForkJoinPool
under the hood to run stream operations in parallel. You can use parallelStream()
or parallel()
on already created stream to perform task parallelly. One thing to note parallelism is not automatically faster than running task in serial unless you have enough data and processor cores.
persons.parallelStream().filter(p -> p.getAge() > 30).collect(Collectors.toList());
Pass
java.util.concurrent.ForkJoinPool.common.parallelism
property while JVM startup to increase parallelism in fork-join pool.
ConcurrentMap<Integer, List<Person>> personByAgeConcurrent = persons.stream().collect(Collectors.groupingByConcurrent(Person::getAge));
System.out.println(personByAgeConcurrent);
Prevent interference, side-effects and stateful lambda/functions
Side effects
If the function is doing more than consuming and/ or returning value, like modifying the state is said to have side-effects. A common example of side-effect is
forEach()
, mutable reduction usingcollect()
. Java handles side-effects incollect()
in thread-safe manner.Interference
You should avoid interference in your lambdas/ functions. It occurs when you modify the underlying collection while running pipeline operations.
Stateful Lambda expressions
A lambda expression is stateful if its result depends on any state which can alter/ change during execution. Avoid using stateful lambdas expressions. You can read more here.
I hope you find this post informative and helpful. You can find the example code for reduction, aggregate operation and stream creation on Github.