Introduction
Java Streams provide a powerful abstraction for processing collections of data. With the introduction of parallel streams, it became easier to write parallelized code for better performance, especially for computationally intensive or I/O-bound tasks. However, understanding how streams and parallelism work under the hood is crucial to use them effectively.
In this blog, I’ll share a real-world scenario where incorrect usage of Stream.parallel() led to unexpected results, and how we corrected it for optimal performance.
The Problematic Code
Here’s the initial implementation, where the goal was to fetch interactions from multiple repositories in parallel:
public class FetchInteractionsServiceFout {
private final IVRInteractionRepository ivrInteractionRepository;
private final WebInteractionRepository webInteractionRepository;
private final AppInteractionRepository appInteractionRepository;
public FetchInteractionsServiceFout(IVRInteractionRepository ivrInteractionRepository,
WebInteractionRepository webInteractionRepository,
AppInteractionRepository appInteractionRepository) {
this.ivrInteractionRepository = ivrInteractionRepository;
this.webInteractionRepository = webInteractionRepository;
this.appInteractionRepository = appInteractionRepository;
}
public List<Interaction> fetchInteractions(List<InteractionType> interactionTypes) {
return Stream.of(
interactionTypes.contains(InteractionType.IVR) ? faultSafe(ivrInteractionRepository::fetchInteractions) : Stream.<Interaction>of(),
interactionTypes.contains(InteractionType.WEB) ? faultSafe(webInteractionRepository::fetchInteractions) : Stream.<Interaction>of(),
interactionTypes.contains(InteractionType.APP) ? faultSafe(appInteractionRepository::fetchInteractions) : Stream.<Interaction>of())
.parallel()
.flatMap(s -> s)
.toList();
}
private Stream<Interaction> faultSafe(Supplier<Stream<Interaction>> supplier) {
try {
return supplier.get();
} catch (RuntimeException e) {
return Stream.of();
}
}
}
Observations
- Expectation: The method should fetch interactions from all repositories in parallel.
- Reality: Upon inspecting the thread names, the calls were made sequentially, not in parallel.
Root Cause
The issue arises because stream creation is inherently sequential, and parallelism only applies to operations within the stream pipeline after the .parallel() call. In the problematic code:
- The streams were created using
Stream.of(...). - The heavy operations (
fetchInteractions) were executed during stream creation, before parallelism was applied. - The
.parallel()and subsequent operations likeflatMaponly processed the already-constructed streams, leaving no heavy operations to parallelize.
The Correct Approach
To ensure that the network calls were made in parallel, the logic was refactored to defer the heavy operations into the stream pipeline, ensuring they benefit from parallelism:
public class FetchInteractionsServiceKlopt {
private final IVRInteractionRepository ivrInteractionRepository;
private final WebInteractionRepository webInteractionRepository;
private final AppInteractionRepository appInteractionRepository;
public FetchInteractionsServiceKlopt(IVRInteractionRepository ivrInteractionRepository,
WebInteractionRepository webInteractionRepository,
AppInteractionRepository appInteractionRepository) {
this.ivrInteractionRepository = ivrInteractionRepository;
this.webInteractionRepository = webInteractionRepository;
this.appInteractionRepository = appInteractionRepository;
}
public List<Interaction> fetchInteractions(List<InteractionType> interactionTypes) {
return interactionTypes.stream()
.parallel()
.flatMap(interactionType ->
faultSafe(() -> switch (interactionType) {
case IVR -> ivrInteractionRepository.fetchInteractions();
case WEB -> webInteractionRepository.fetchInteractions();
case APP -> appInteractionRepository.fetchInteractions();
}))
.toList();
}
private Stream<Interaction> faultSafe(Supplier<Stream<Interaction>> supplier) {
try {
return supplier.get();
} catch (RuntimeException e) {
return Stream.of();
}
}
}
Key Changes
- Defer Heavy Operations to the Pipeline:
- The repository calls (
fetchInteractions) were moved to theflatMapoperation, which is part of the pipeline.
- The repository calls (
- Parallelism Placement:
- The
.parallel()call ensures that eachinteractionTypeis processed concurrently.
- The
- Lazy Evaluation:
- Heavy operations are triggered only when the terminal operation (
toList) executes the pipeline, ensuring they benefit from parallelism.
- Heavy operations are triggered only when the terminal operation (
Why the Correct Code Works
Stream Creation vs. Stream Operations
- Stream Creation:
- In the problematic code, the streams were created using
Stream.of(...), and the heavy operations (fetchInteractions) were executed before the.parallel()call. - Stream creation is always sequential, so the network calls were made sequentially.
- In the problematic code, the streams were created using
- Stream Operations:
- In the correct code,
.parallel()is applied to theinteractionTypes.stream(), and the heavy operations (fetchInteractions) are part of theflatMapoperation within the parallelized pipeline. - This ensures that the heavy operations are distributed across threads in the ForkJoinPool.
- In the correct code,
How Parallel Streams Work
- Parallelism Scope: Parallelism applies only to the operations after
.parallel()in the stream pipeline. - Lazy Evaluation: Streams are lazily evaluated. Operations like
flatMapormapexecute only when a terminal operation (e.g.,toList) is invoked. - ForkJoinPool: Parallel streams use the common ForkJoinPool to split and process tasks concurrently. The tasks are distributed based on the pool’s parallelism level.
Lessons Learned
- Stream Creation Is Sequential:
- The creation of a stream (e.g., using
Stream.of(...)) does not involve parallelism.
- The creation of a stream (e.g., using
- Defer Heavy Operations:
- Place heavy or time-consuming operations (e.g., network calls) inside the stream pipeline to benefit from parallelism.
- Parallelism Placement Matters:
- Apply
.parallel()at the right point in the stream pipeline to ensure operations run concurrently.
- Apply
- Verify Parallel Execution:
- Use tools or logs (
Thread.currentThread().getName()) to ensure tasks are distributed across threads as expected.
- Use tools or logs (