W3docs

Les flux parallèles en Java

Traitez les flux Java en parallèle pour gagner en vitesse — quand parallelStream aide et quand il nuit aux performances.

Un flux parallèle est le même pipeline de flux que vous écrivez habituellement, sauf que la JVM est autorisée à diviser la source en morceaux et à les traiter sur plusieurs threads. Le changement au niveau de l'appel est minime :

long total = nums.parallelStream().mapToLong(n -> heavy(n)).sum();
//              ^^^^^^^^^^^^^^^^^

ou :

long total = nums.stream().parallel().mapToLong(n -> heavy(n)).sum();

La forme du pipeline, les opérations, le résultat — tout reste inchangé. Ce qui change, c'est qui l'exécute : au lieu d'un seul thread parcourant la source, plusieurs workers du ForkJoinPool commun (un par cœur CPU, moins un) se partagent le travail, et un coordinateur fusionne leurs résultats partiels. Lorsque le travail par élément est suffisamment lourd et que la source se divise proprement, le pipeline se termine en environ temps-réel / nombre-de-cœurs. Dans le cas contraire, le parallèle est plus lent que le séquentiel — et parfois incorrect. Ce chapitre explique comment faire la différence.

Ce que « parallèle » fait réellement

Un flux séquentiel fait passer un élément à travers le pipeline, puis le suivant. Un flux parallèle :

  1. Divise la source en sous-flux via le Spliterator de la source. Les tableaux, ArrayList, IntStream.range et les sources similaires se divisent proprement en O(1). LinkedList, Files.lines, Stream.iterate et Stream.generate se divisent mal ou refusent de le faire.
  2. Exécute la chaîne intermédiaire de chaque sous-flux sur un thread worker du pool commun.
  3. Fusionne les résultats partiels — pour reduce et collect, c'est le rôle du combiner.

forEach dans un flux parallèle appelle votre Consumer depuis plusieurs threads simultanément et dans un ordre non spécifié. forEachOrdered préserve l'ordre de rencontre au prix d'une synchronisation. findFirst en parallèle est plus coûteux que findAny pour la même raison — il doit coordonner pour identifier la première correspondance.

Le contrat — ce que votre pipeline doit satisfaire

Le parallèle ne donne une réponse correcte que lorsque le pipeline respecte trois règles. Du code séquentiel qui les ignore par hasard fonctionne quand même ; du code parallèle qui les viole produit silencieusement des résultats erronés.

  1. Le réducteur doit être associatif. f(f(a, b), c) == f(a, f(b, c)). +, *, max, min, l'union d'ensembles, la concaténation de listes — tous conviennent. La soustraction, la division, « première correspondance » et « ajout à liste avec ordre » ne conviennent pas. Si vous passez un BinaryOperator non associatif à reduce ou Collectors.reducing, le résultat dépend de la façon dont la JVM décide de diviser.
  2. Le pipeline doit être sans état. Vos lambdas ne doivent pas lire ni écrire d'état mutable partagé. Un lambda qui capture et modifie un ArrayList externe, incrémente un int[] externe, ou utilise un compteur non atomique sera en situation de compétition en parallèle.
  3. Le pipeline doit être exempt d'effets de bord. La journalisation est acceptable ; la persistance via un sink thread-safe est acceptable ; tout le reste est un bug qui attend qu'un worker l'intercale différemment.

Les collecteurs intégrés à Collectors satisfont les règles 1 à 3 par construction (lorsqu'ils sont utilisés comme documenté). Vos propres lambdas dans map, filter, reduce et peek sont ceux à surveiller.

Quand le parallèle aide (et quand il ne le fait pas)

Un flux parallèle ne l'emporte que lorsque le travail par élément est suffisamment important pour éclipser le coût de coordination — division, ordonnancement, fusion et comptabilité du framework. Un modèle mental approximatif :

  • Source grande + travail par élément lié au CPU + fusion bon marché + source divisible = le parallèle gagne souvent. Le traitement d'image par pixel, l'analyse par enregistrement, le hachage par fichier — des cas classiques.
  • Source petite = le séquentiel gagne. Le réveil du pool est plus coûteux que l'ensemble du calcul.
  • Travail par élément léger = le séquentiel gagne. nums.stream().mapToInt(Integer::intValue).sum() est plus rapide que son équivalent parallelStream() jusqu'à ce que nums contienne des millions d'éléments ; à petite échelle, la surcharge du framework domine.
  • I/O bloquante par élément = les flux parallèles sont le mauvais outil. Le ForkJoinPool commun est dimensionné pour le travail CPU ; un appel I/O bloquant monopolise un worker et affame tous les autres flux parallèles de la JVM (y compris ceux des bibliothèques). Utilisez CompletableFuture avec un exécuteur borné pour le fan-out I/O.
  • Source non divisible = le parallèle revient soit au séquentiel, soit se divise mal. Files.lines, Stream.iterate, Stream.generate et LinkedList.stream() sont les mauvais exemples canoniques ; les tableaux, ArrayList et IntStream.range sont les bons.

Le conseil honnête : privilégiez le séquentiel par défaut ; passez au parallèle uniquement lorsque vous avez une raison mesurée de le faire, avec des chiffres jmh ou des mesures de temps réel en main.

Opérations dont le comportement change en parallèle

Quelques opérations dont la signification change lorsque le pipeline passe en parallèle :

  • forEach — s'exécute depuis plusieurs threads, dans un ordre non spécifié. Si l'ordre importe, utilisez forEachOrdered (ce qui entraîne un coût de synchronisation).
  • findFirst — doit coordonner entre les workers pour identifier la première correspondance dans l'ordre de rencontre. Utilisez findAny si vous ne vous souciez pas de laquelle gagne.
  • limit / skip — bien définis sur les flux ordonnés, mais plus coûteux en parallèle car la JVM doit respecter l'ordre. Sur un flux parallèle où l'ordre n'importe pas, stream.parallel().unordered().limit(n) est moins cher.
  • distinct / sorted — doivent coordonner entre les workers ; le tampon qu'ils maintiennent est partagé.
  • reduce avec la surcharge à 3 arguments utilise le combiner pour fusionner les sorties des workers. Avec la surcharge à 2 arguments, la JVM utilise l'identité deux fois plus l'accumulateur — même contrat, même règle d'associativité.
  • collect — les Collectors sont conçus pour être sûrs en parallèle ; la nuance est que le conteneur de résultat peut être un HashMap ou ArrayList ordinaire, et la collecte parallèle coordonne en interne pour rester sûre. Vos collecteurs aval doivent respecter le contrat.

Le piège de l'état partagé, de manière concrète

Le bug le plus courant dans le code parallèle des débutants :

// WRONG -- looks fine, races in parallel
List<String> shouts = new ArrayList<>();
words.parallelStream().forEach(w -> shouts.add(w.toUpperCase()));

ArrayList.add n'est pas thread-safe ; des workers concurrents perdent soit des éléments, en ajoutent en double, lèvent une ArrayIndexOutOfBoundsException, ou corrompent silencieusement la liste. La version correcte exprime le résultat comme la sortie du pipeline, et non comme un effet de bord :

List<String> shouts = words.parallelStream().map(String::toUpperCase).toList();

toList(), comme tout autre collecteur ou terminal qui produit une valeur, est conçu pour une utilisation parallèle. Dès que vous atteignez un forEach qui mute une variable externe, vous avez quitté la voie sûre.

Si vous avez réellement besoin d'un sink thread-safe pour forEach, utilisez un ConcurrentLinkedQueue, AtomicLong, LongAdder, ou Collections.synchronizedList(...). Mais presque toujours, la bonne réponse est « n'utilisez pas forEach pour l'accumulation ; laissez le pipeline construire le résultat. »

ForkJoinPool et pourquoi c'est important

Par défaut, chaque flux parallèle de votre JVM partage le pool commun, dimensionné à Runtime.getRuntime().availableProcessors() - 1 threads workers. Cela a deux conséquences :

  • Un flux parallèle de longue durée monopolise le pool. Tout autre flux parallèle — y compris ceux à l'intérieur des bibliothèques — s'y mettra en file d'attente.
  • Un flux parallèle qui bloque (I/O, verrous, Thread.sleep) monopolise un thread worker sans effectuer de travail, réduisant de moitié la taille effective du pool pendant l'attente.

Vous pouvez dédier un pool privé pour un pipeline ponctuel :

try (var pool = new java.util.concurrent.ForkJoinPool(4)) {
    long total = pool.submit(() ->
        nums.parallelStream().mapToLong(n -> heavy(n)).sum()
    ).get();
}

C'est la bonne approche pour un calcul de longue durée que vous ne voulez pas partager avec le reste de la JVM. C'est toujours la mauvaise approche pour l'I/O bloquante — passez aux threads virtuels ou à une chaîne CompletableFuture explicite sur un exécuteur I/O borné.

Un exemple concret : accélération parallèle, piège de l'état partagé et bug d'associativité

Le programme ci-dessous mesure le séquentiel vs. le parallèle pour une somme IntStream liée au CPU, démontre la compétition d'état partagé avec forEach, montre la version correcte basée sur un collecteur, et contraste les réducteurs associatifs (Integer::sum) avec les non-associatifs ((a, b) -> a - b) en parallèle.

java— editable, runs on the server

Ce qu'il faut retenir de l'exécution :

  • La somme parallèle a produit le même résultat que la somme séquentielle et (sur toute machine multi-cœurs) s'est terminée en une fraction du temps réel. L'appel heavy par élément est lié au CPU et la source (un int[]) se divise proprement — les deux ingrédients dont le parallèle a besoin.
  • Le forEach qui mutait badSink a soit perdu des éléments, soit planté. Il n'existe aucun correctif qui ajoute un synchronized ici sans rendre la version parallèle plus lente que la version séquentielle. Le correctif consiste à ne pas écrire forEach pour l'accumulation — utilisez un collecteur ou un terminal qui produit le résultat.
  • Integer::sum est associatif ; la réduction parallèle a produit la même réponse que la réduction séquentielle. Le (a, b) -> a - b non associatif a produit des réponses différentes en séquentiel et en parallèle car la JVM est libre de diviser et fusionner dans n'importe quel ordre associativement équivalent. Même code, deux réponses — le symptôme que tout bug de flux parallèle finit par produire.
  • parallel().forEach(...) a affiché 0..15 dans un ordre non monotone ; parallel().forEachOrdered(...) les a affichés dans l'ordre au prix d'une synchronisation entre workers. Si votre forEach se soucie de l'ordre, vous le payez.
  • Le ForkJoinPool(2) privé a exécuté le pipeline sur un pool dédié. Utilisez-le lorsque vous avez un travail de calcul de longue durée et que vous ne voulez pas le partager avec le pool commun du reste de la JVM. Ne l'utilisez pas comme rustine pour l'I/O bloquante — c'est un problème différent avec un outil différent.

Et ensuite

Vous pouvez maintenant raisonner sur chaque pipeline de flux : quand en écrire un, comment le construire, ce qui est paresseux, ce qui court-circuite, ce qui s'exécute en parallèle en toute sécurité, et ce qui ne le fait pas. Une abstraction centrale est encore sur la table — celle qui permet à un pipeline d'exprimer « cette valeur peut être absente » sans un seul null. Le prochain chapitre, Java Optional, couvre Optional<T> — ce que c'est, où l'API de flux laisse ses fils libres, et comment utiliser map, flatMap, orElse et ifPresent pour écrire du code null-safe par construction.

Pratique

Pratique
`nums.parallelStream().reduce(0, (a, b) -> a - b)` retourne une réponse différente de son équivalent `stream()`. Pourquoi ?
`nums.parallelStream().reduce(0, (a, b) -> a - b)` retourne une réponse différente de son équivalent `stream()`. Pourquoi ?
Was this page helpful?