Understanding Java Streams and Parallel Execution: A Case Study

Introduction

Java Streams provide a powerful abstraction for processing collections of data. With the introduction of parallel streams, it became easier to write parallelized code for better performance, especially for computationally intensive or I/O-bound tasks. However, understanding how streams and parallelism work under the hood is crucial to use them effectively.

In this blog, I’ll share a real-world scenario where incorrect usage of Stream.parallel() led to unexpected results, and how we corrected it for optimal performance.

The Problematic Code

Here’s the initial implementation, where the goal was to fetch interactions from multiple repositories in parallel:

public class FetchInteractionsServiceFout {
    private final IVRInteractionRepository ivrInteractionRepository;
    private final WebInteractionRepository webInteractionRepository;
    private final AppInteractionRepository appInteractionRepository;

    public FetchInteractionsServiceFout(IVRInteractionRepository ivrInteractionRepository,
                                        WebInteractionRepository webInteractionRepository,
                                        AppInteractionRepository appInteractionRepository) {
        this.ivrInteractionRepository = ivrInteractionRepository;
        this.webInteractionRepository = webInteractionRepository;
        this.appInteractionRepository = appInteractionRepository;
    }

    public List<Interaction> fetchInteractions(List<InteractionType> interactionTypes) {
        return Stream.of(
                        interactionTypes.contains(InteractionType.IVR) ? faultSafe(ivrInteractionRepository::fetchInteractions) : Stream.<Interaction>of(),
                        interactionTypes.contains(InteractionType.WEB) ? faultSafe(webInteractionRepository::fetchInteractions) : Stream.<Interaction>of(),
                        interactionTypes.contains(InteractionType.APP) ? faultSafe(appInteractionRepository::fetchInteractions) : Stream.<Interaction>of())
                .parallel()
                .flatMap(s -> s)
                .toList();
    }

    private Stream<Interaction> faultSafe(Supplier<Stream<Interaction>> supplier) {
        try {
            return supplier.get();
        } catch (RuntimeException e) {
            return Stream.of();
        }
    }
}

Observations

  • Expectation: The method should fetch interactions from all repositories in parallel.
  • Reality: Upon inspecting the thread names, the calls were made sequentially, not in parallel.

Root Cause

The issue arises because stream creation is inherently sequential, and parallelism only applies to operations within the stream pipeline after the .parallel() call. In the problematic code:

  1. The streams were created using Stream.of(...).
  2. The heavy operations (fetchInteractions) were executed during stream creation, before parallelism was applied.
  3. The .parallel() and subsequent operations like flatMap only processed the already-constructed streams, leaving no heavy operations to parallelize.

The Correct Approach

To ensure that the network calls were made in parallel, the logic was refactored to defer the heavy operations into the stream pipeline, ensuring they benefit from parallelism:

public class FetchInteractionsServiceKlopt {
    private final IVRInteractionRepository ivrInteractionRepository;
    private final WebInteractionRepository webInteractionRepository;
    private final AppInteractionRepository appInteractionRepository;

    public FetchInteractionsServiceKlopt(IVRInteractionRepository ivrInteractionRepository,
                                         WebInteractionRepository webInteractionRepository,
                                         AppInteractionRepository appInteractionRepository) {
        this.ivrInteractionRepository = ivrInteractionRepository;
        this.webInteractionRepository = webInteractionRepository;
        this.appInteractionRepository = appInteractionRepository;
    }

    public List<Interaction> fetchInteractions(List<InteractionType> interactionTypes) {
        return interactionTypes.stream()
                .parallel()
                .flatMap(interactionType ->
                        faultSafe(() -> switch (interactionType) {
                            case IVR -> ivrInteractionRepository.fetchInteractions();
                            case WEB -> webInteractionRepository.fetchInteractions();
                            case APP -> appInteractionRepository.fetchInteractions();
                        }))
                .toList();
    }

    private Stream<Interaction> faultSafe(Supplier<Stream<Interaction>> supplier) {
        try {
            return supplier.get();
        } catch (RuntimeException e) {
            return Stream.of();
        }
    }
}

Key Changes

  1. Defer Heavy Operations to the Pipeline:
    • The repository calls (fetchInteractions) were moved to the flatMap operation, which is part of the pipeline.
  2. Parallelism Placement:
    • The .parallel() call ensures that each interactionType is processed concurrently.
  3. Lazy Evaluation:
    • Heavy operations are triggered only when the terminal operation (toList) executes the pipeline, ensuring they benefit from parallelism.

Why the Correct Code Works

Stream Creation vs. Stream Operations

  1. Stream Creation:
    • In the problematic code, the streams were created using Stream.of(...), and the heavy operations (fetchInteractions) were executed before the .parallel() call.
    • Stream creation is always sequential, so the network calls were made sequentially.
  2. Stream Operations:
    • In the correct code, .parallel() is applied to the interactionTypes.stream(), and the heavy operations (fetchInteractions) are part of the flatMap operation within the parallelized pipeline.
    • This ensures that the heavy operations are distributed across threads in the ForkJoinPool.

How Parallel Streams Work

  • Parallelism Scope: Parallelism applies only to the operations after .parallel() in the stream pipeline.
  • Lazy Evaluation: Streams are lazily evaluated. Operations like flatMap or map execute only when a terminal operation (e.g., toList) is invoked.
  • ForkJoinPool: Parallel streams use the common ForkJoinPool to split and process tasks concurrently. The tasks are distributed based on the pool’s parallelism level.

Lessons Learned

  1. Stream Creation Is Sequential:
    • The creation of a stream (e.g., using Stream.of(...)) does not involve parallelism.
  2. Defer Heavy Operations:
    • Place heavy or time-consuming operations (e.g., network calls) inside the stream pipeline to benefit from parallelism.
  3. Parallelism Placement Matters:
    • Apply .parallel() at the right point in the stream pipeline to ensure operations run concurrently.
  4. Verify Parallel Execution:
    • Use tools or logs (Thread.currentThread().getName()) to ensure tasks are distributed across threads as expected.

Leave a Comment