Les flux parallèles en Java
Traitez les flux Java en parallèle pour gagner en vitesse — quand parallelStream aide et quand il nuit aux performances.
Un flux parallèle est le même pipeline de flux que vous écrivez habituellement, sauf que la JVM est autorisée à diviser la source en morceaux et à les traiter sur plusieurs threads. Le changement au niveau de l'appel est minime :
long total = nums.parallelStream().mapToLong(n -> heavy(n)).sum();
// ^^^^^^^^^^^^^^^^^ou :
long total = nums.stream().parallel().mapToLong(n -> heavy(n)).sum();La forme du pipeline, les opérations, le résultat — tout reste inchangé. Ce qui change, c'est qui l'exécute : au lieu d'un seul thread parcourant la source, plusieurs workers du ForkJoinPool commun (un par cœur CPU, moins un) se partagent le travail, et un coordinateur fusionne leurs résultats partiels. Lorsque le travail par élément est suffisamment lourd et que la source se divise proprement, le pipeline se termine en environ temps-réel / nombre-de-cœurs. Dans le cas contraire, le parallèle est plus lent que le séquentiel — et parfois incorrect. Ce chapitre explique comment faire la différence.
Ce que « parallèle » fait réellement
Un flux séquentiel fait passer un élément à travers le pipeline, puis le suivant. Un flux parallèle :
- Divise la source en sous-flux via le
Spliteratorde la source. Les tableaux,ArrayList,IntStream.rangeet les sources similaires se divisent proprement en O(1).LinkedList,Files.lines,Stream.iterateetStream.generatese divisent mal ou refusent de le faire. - Exécute la chaîne intermédiaire de chaque sous-flux sur un thread worker du pool commun.
- Fusionne les résultats partiels — pour
reduceetcollect, c'est le rôle ducombiner.
forEach dans un flux parallèle appelle votre Consumer depuis plusieurs threads simultanément et dans un ordre non spécifié. forEachOrdered préserve l'ordre de rencontre au prix d'une synchronisation. findFirst en parallèle est plus coûteux que findAny pour la même raison — il doit coordonner pour identifier la première correspondance.
Le contrat — ce que votre pipeline doit satisfaire
Le parallèle ne donne une réponse correcte que lorsque le pipeline respecte trois règles. Du code séquentiel qui les ignore par hasard fonctionne quand même ; du code parallèle qui les viole produit silencieusement des résultats erronés.
- Le réducteur doit être associatif.
f(f(a, b), c) == f(a, f(b, c)).+,*,max,min, l'union d'ensembles, la concaténation de listes — tous conviennent. La soustraction, la division, « première correspondance » et « ajout à liste avec ordre » ne conviennent pas. Si vous passez unBinaryOperatornon associatif àreduceouCollectors.reducing, le résultat dépend de la façon dont la JVM décide de diviser. - Le pipeline doit être sans état. Vos lambdas ne doivent pas lire ni écrire d'état mutable partagé. Un lambda qui capture et modifie un
ArrayListexterne, incrémente unint[]externe, ou utilise un compteur non atomique sera en situation de compétition en parallèle. - Le pipeline doit être exempt d'effets de bord. La journalisation est acceptable ; la persistance via un sink thread-safe est acceptable ; tout le reste est un bug qui attend qu'un worker l'intercale différemment.
Les collecteurs intégrés à Collectors satisfont les règles 1 à 3 par construction (lorsqu'ils sont utilisés comme documenté). Vos propres lambdas dans map, filter, reduce et peek sont ceux à surveiller.
Quand le parallèle aide (et quand il ne le fait pas)
Un flux parallèle ne l'emporte que lorsque le travail par élément est suffisamment important pour éclipser le coût de coordination — division, ordonnancement, fusion et comptabilité du framework. Un modèle mental approximatif :
- Source grande + travail par élément lié au CPU + fusion bon marché + source divisible = le parallèle gagne souvent. Le traitement d'image par pixel, l'analyse par enregistrement, le hachage par fichier — des cas classiques.
- Source petite = le séquentiel gagne. Le réveil du pool est plus coûteux que l'ensemble du calcul.
- Travail par élément léger = le séquentiel gagne.
nums.stream().mapToInt(Integer::intValue).sum()est plus rapide que son équivalentparallelStream()jusqu'à ce quenumscontienne des millions d'éléments ; à petite échelle, la surcharge du framework domine. - I/O bloquante par élément = les flux parallèles sont le mauvais outil. Le
ForkJoinPoolcommun est dimensionné pour le travail CPU ; un appel I/O bloquant monopolise un worker et affame tous les autres flux parallèles de la JVM (y compris ceux des bibliothèques). UtilisezCompletableFutureavec un exécuteur borné pour le fan-out I/O. - Source non divisible = le parallèle revient soit au séquentiel, soit se divise mal.
Files.lines,Stream.iterate,Stream.generateetLinkedList.stream()sont les mauvais exemples canoniques ; les tableaux,ArrayListetIntStream.rangesont les bons.
Le conseil honnête : privilégiez le séquentiel par défaut ; passez au parallèle uniquement lorsque vous avez une raison mesurée de le faire, avec des chiffres jmh ou des mesures de temps réel en main.
Opérations dont le comportement change en parallèle
Quelques opérations dont la signification change lorsque le pipeline passe en parallèle :
forEach— s'exécute depuis plusieurs threads, dans un ordre non spécifié. Si l'ordre importe, utilisezforEachOrdered(ce qui entraîne un coût de synchronisation).findFirst— doit coordonner entre les workers pour identifier la première correspondance dans l'ordre de rencontre. UtilisezfindAnysi vous ne vous souciez pas de laquelle gagne.limit/skip— bien définis sur les flux ordonnés, mais plus coûteux en parallèle car la JVM doit respecter l'ordre. Sur un flux parallèle où l'ordre n'importe pas,stream.parallel().unordered().limit(n)est moins cher.distinct/sorted— doivent coordonner entre les workers ; le tampon qu'ils maintiennent est partagé.reduceavec la surcharge à 3 arguments utilise lecombinerpour fusionner les sorties des workers. Avec la surcharge à 2 arguments, la JVM utilise l'identité deux fois plus l'accumulateur — même contrat, même règle d'associativité.collect— lesCollectorssont conçus pour être sûrs en parallèle ; la nuance est que le conteneur de résultat peut être unHashMapouArrayListordinaire, et la collecte parallèle coordonne en interne pour rester sûre. Vos collecteurs aval doivent respecter le contrat.
Le piège de l'état partagé, de manière concrète
Le bug le plus courant dans le code parallèle des débutants :
// WRONG -- looks fine, races in parallel
List<String> shouts = new ArrayList<>();
words.parallelStream().forEach(w -> shouts.add(w.toUpperCase()));ArrayList.add n'est pas thread-safe ; des workers concurrents perdent soit des éléments, en ajoutent en double, lèvent une ArrayIndexOutOfBoundsException, ou corrompent silencieusement la liste. La version correcte exprime le résultat comme la sortie du pipeline, et non comme un effet de bord :
List<String> shouts = words.parallelStream().map(String::toUpperCase).toList();toList(), comme tout autre collecteur ou terminal qui produit une valeur, est conçu pour une utilisation parallèle. Dès que vous atteignez un forEach qui mute une variable externe, vous avez quitté la voie sûre.
Si vous avez réellement besoin d'un sink thread-safe pour forEach, utilisez un ConcurrentLinkedQueue, AtomicLong, LongAdder, ou Collections.synchronizedList(...). Mais presque toujours, la bonne réponse est « n'utilisez pas forEach pour l'accumulation ; laissez le pipeline construire le résultat. »
ForkJoinPool et pourquoi c'est important
Par défaut, chaque flux parallèle de votre JVM partage le pool commun, dimensionné à Runtime.getRuntime().availableProcessors() - 1 threads workers. Cela a deux conséquences :
- Un flux parallèle de longue durée monopolise le pool. Tout autre flux parallèle — y compris ceux à l'intérieur des bibliothèques — s'y mettra en file d'attente.
- Un flux parallèle qui bloque (I/O, verrous,
Thread.sleep) monopolise un thread worker sans effectuer de travail, réduisant de moitié la taille effective du pool pendant l'attente.
Vous pouvez dédier un pool privé pour un pipeline ponctuel :
try (var pool = new java.util.concurrent.ForkJoinPool(4)) {
long total = pool.submit(() ->
nums.parallelStream().mapToLong(n -> heavy(n)).sum()
).get();
}C'est la bonne approche pour un calcul de longue durée que vous ne voulez pas partager avec le reste de la JVM. C'est toujours la mauvaise approche pour l'I/O bloquante — passez aux threads virtuels ou à une chaîne CompletableFuture explicite sur un exécuteur I/O borné.
Un exemple concret : accélération parallèle, piège de l'état partagé et bug d'associativité
Le programme ci-dessous mesure le séquentiel vs. le parallèle pour une somme IntStream liée au CPU, démontre la compétition d'état partagé avec forEach, montre la version correcte basée sur un collecteur, et contraste les réducteurs associatifs (Integer::sum) avec les non-associatifs ((a, b) -> a - b) en parallèle.
Ce qu'il faut retenir de l'exécution :
- La somme parallèle a produit le même résultat que la somme séquentielle et (sur toute machine multi-cœurs) s'est terminée en une fraction du temps réel. L'appel
heavypar élément est lié au CPU et la source (unint[]) se divise proprement — les deux ingrédients dont le parallèle a besoin. - Le
forEachqui mutaitbadSinka soit perdu des éléments, soit planté. Il n'existe aucun correctif qui ajoute unsynchronizedici sans rendre la version parallèle plus lente que la version séquentielle. Le correctif consiste à ne pas écrireforEachpour l'accumulation — utilisez un collecteur ou un terminal qui produit le résultat. Integer::sumest associatif ; la réduction parallèle a produit la même réponse que la réduction séquentielle. Le(a, b) -> a - bnon associatif a produit des réponses différentes en séquentiel et en parallèle car la JVM est libre de diviser et fusionner dans n'importe quel ordre associativement équivalent. Même code, deux réponses — le symptôme que tout bug de flux parallèle finit par produire.parallel().forEach(...)a affiché0..15dans un ordre non monotone ;parallel().forEachOrdered(...)les a affichés dans l'ordre au prix d'une synchronisation entre workers. Si votreforEachse soucie de l'ordre, vous le payez.- Le
ForkJoinPool(2)privé a exécuté le pipeline sur un pool dédié. Utilisez-le lorsque vous avez un travail de calcul de longue durée et que vous ne voulez pas le partager avec le pool commun du reste de la JVM. Ne l'utilisez pas comme rustine pour l'I/O bloquante — c'est un problème différent avec un outil différent.
Et ensuite
Vous pouvez maintenant raisonner sur chaque pipeline de flux : quand en écrire un, comment le construire, ce qui est paresseux, ce qui court-circuite, ce qui s'exécute en parallèle en toute sécurité, et ce qui ne le fait pas. Une abstraction centrale est encore sur la table — celle qui permet à un pipeline d'exprimer « cette valeur peut être absente » sans un seul null. Le prochain chapitre, Java Optional, couvre Optional<T> — ce que c'est, où l'API de flux laisse ses fils libres, et comment utiliser map, flatMap, orElse et ifPresent pour écrire du code null-safe par construction.