Streszczenie: Parallelizm i programowanie asynchroniczne ze Stream API oraz CompletableFuture cz. I
Poniższy wpis jest streszczeniem wykładu Venkata Subramaniama z konferencji Devoxx 2018 na teamat parallelizmu i asynchronicznośći w Javie. Wykład można znaleźć pod tym linkiem: https://www.youtube.com/watch?v=0hQvWIdwnw4.
Cały wykład, czyli cześć pierwsza i druga trwają 3h14minut więc postaram się napisać co udało mi się zapamiętać i zrozumieć z tej lekcji. Venkat to świetny prelegent, który sprawia, że dość ciężkie zagadnienia “wchodzą” z łatwością do głowy.
Pierwsza część obejmuje parallelizm
Oba pojęcia czyli parallel i programowanie asynchroniczne różnią się od siebie, nie jest to to samo.
Parallelizacja – rozdzielamy jakieś zadania na mniejsze (fork), dzieląc je między dostępne wątki, na końcu łączymy (join) i zbieramy rezultat
Asynchronicznośc – operacje wykonywane są asynchronicznie, nie trzeba czekać na dostarczenie wszystkich wyników operacji z każdego wątku, główna operacja (praca) nadal trwa
Venkat porównał to do imprezy. Parallel to kiedy mówimy jednej osobie, żeby przyniosła pizzę a drugiej aby przyniosła napoje, rozdzielamy więc pracę i kiedy obie osoby przyniosą swoje składniki rozpoczynamy zabawę. Asynchroniczność to kiedy mówimy jednej osobie żeby przyniosła pizzę a drugiej aby przyniosła napoje z tym, że kiedy nie ma obu osób możemy już się bawić i tańczyć, kiedy pierwsza osoba przyniesie pizzę, jemy pizzę i dalej się bawimy, następnie druga osoba przynosi napoje, cały czas trwa impreza. Nie musimy czekać na każdy składnik aby ją zacząć.
Luźne notatki:
Streams - addiction
Collection Pipeline Pattern
//functional style - functional composition
stream() - internal iterator (tak to można nazwać)
Functional composition -> Collection Pipeline
//imperative style has accidental complexity
//functional style has less complexity and it is also easier to parallelize
Parallelizowany stream można utworzyć na dwa sposoby:
- parallelStream() – kiedy jesteśmy kreatorem streama
- stream().parallel() – kiedy nie jesteśmy kreatorem tylko np. dostajemy sekwencyjny stream jako wynik jakiejś funkcji, argument itp.
Venkat podkreślił, że kod sekwencyjny ma bardzo różniącą się strukturę od kodu wielowątkowego. Kiedy używamy synchronized, bardzo nadruszamy sobie strukturę kodu, do tego trzeba zapewne łapać wyjątki związane z wątkami. Kiedy używamy streamów struktura sekwencyjnego kodu jest identyczna ze strukturą kodu wielowątkowego.
- sequential() – niweluje parallelizm w streamie
Jeśli mamy oba zarówno parallel() jak i sequential() w streamie to zastosowana jest zasada last wins, ostatnim jest ten najbliżej operacji terminalnej.
W kolejnych “slajdach” Venkat pokazał różnice miedzy Streamami a Streamami Reaktywnymi.
We solve one set of problems only to create a new set of problems
Tak o kolejnych generacjach wielowątkowości mówił Venkat Subramaniam
- Java 1 – Threads – wiemy jak wygląda praca z wątkami na najniższym poziomie i jak wygląda kod
- Java 5 – ExcecutorServices – duże uproszczenie, wrzucanie zadań do worka i czekanie na odpowiedź, z tym że mogły wywoływać Pool deadlock czyli sytuację, że za duże żądania cały czas dzielone na mniejsze przez wątki executora mogły wywoływać deadlock, wątki po podzieleniu zadania przekazywały zadanie dalej i czekały na kolejne zadanie do podziału, tylko, że czasami mogło zabraknąć wątków które mogłby wykonywać pracę tych podzielonych części i następował deadlock
- Java 7 – ForkJoinPool – wątki dzielą probemy ale już nie czekają bez końca na kolejne dzielenie zadań tylko wykonują pracę innego wątku, nie marnuje się wątku tylko na czekanie, jest to bardziej efektywne wykorzystanie czasu procesora, jest to tak zwany work stealing – podbieranie pracy innego wątku kiedy nasz wątek za długo jest bezczynny
Jak wiadomo parallel nie zachowuje kolejności wywołań.
Niektóre metody są właściwie uporządkowane (inherently ordered) czyli muszą mieć zachowaną jakąś kolejność działań. Inne natomiast mogą nie być uporządkowane lecz mieć uporządkowane odpowiedniki (ordered counterpart). Aby zachować i wielowątkowość w streamie i kolejność wywołań, np. odczyt można użyć .forEachOrdered() jako operację terminalną na parallel streamie. Nadal jest to wielowątkowe wywołanie, bo zadania wykonują się równolegle jednak muszą poczekać z dokończeniem pracy na poprzednika. Metoda .forEachOrdered() gwarantuje uporządkowanie tylko jeśli przekazany miał już wcześniej możliwość bycia uporządkowanym np. tak jak Lista. Na Secie nie zadziała bo ta metoda nie wyczaruje tego uporządkowania, ona ma je tylko zachować.
- Parallel and filter (OK)
- Parallel and map (OK)
- Parallel and reduce (BARDZO NIE OK), to znaczy jeśli seed czyli jak Venkat mówił NIE initial value a Identity Value (czyli wartość przekazana do operacji nie zmieniająca wyniku operacji np. w mnożeniu to liczba 1: x * 1 = x, w dodawaniu 0: x + 0 = x) to wtedy się wszystko zgadza i jest OK ale jeśli nie jest Identity to wyniki bardzo się rozjeżdżają bo każdy wątek bierze sobie tą wartość początkową i nie patrzy na to co robią inne wątki, zatem jeśli w streamie sumującym ustawimy wartość początkową na np. 30 to każdy wątek na początku sobie weźmie 30 i doda to otrzymanego wyniku
How many threads can I create? //bad question
How many threads should I create? //good question
Venkat Subramaniam
Computation Intensive vs IO Intensive
Jeżeli wątków będzie za dużo na za małą liczbę rdzeni procesora i na za małą liczbę zadań to może się okazać, że wątki będą oddawały cały czas swój czas procesora na rzecz innego wątku zamiast wykonywać swoje zadanie. Może to być bardzo nieoptymalne. W przypadku jeśli chodzi o Computation problem (czyli złożony problem obliczeniowy) wątków powinno być: liczba wątków <= liczba rdzeni procesora. W przypadku operacji IO dany jest następujący wzór: liczba wątków <= liczba rdzeni procesora / 1 – blocking factor. Blocking factor to liczba z zakresu 0 <= blocking factor < 1. W przypadku kiedy będzie 1 mamy deadlock. Więc w parallel streamie który działa w oparciu o ForkJoinPool można sobie zwiększyć liczbę wątków w CommonPool ale nie jest to zalecane bo może zmniejszyć wydajność. Rozmiar CommonPoola można ustawiać flagą:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=100
wtedy wątki w parallel streamie będą się “batchować” po 100, normalnie batchują sie po standardowym rozmiarze tego common poola + główny wątek (czyli najpewniej łącznie tyle ile jest rdzeni procesora). Można też to zrobić programowo:
ForkJoinPool pool = new ForkJoinPool(100);
pool.submit(() -> stream.forEach(e -> {}));
pool.shutdown();
pool.awaitTermination(10, SEC); //tego nie musi być
Więc jak widać stream wykonuje się nie w tej puli wątków gdzie został stworzony a tam gdzie została wywołana operacja terminalna. Więc można wrzucić taki .parallelStream() do takiego ForkJoinPoola i zmusić programowo do zwiększenia liczby wątków. CO NIE ZNACZY, ŻE BĘDZIE SZYBCIEJ! WRĘCZ ODWROTNIE!
Jak zdecydować kiedy użyć parallel?
- Jak jest dużo obiektów – ma to sens, jak jest mało obiektów – nie ma sensu
- Jak coś się bardzo szybko wywołuje to też nie ma sensu bo nic to nie da, tylko będzie marnowało zasoby (parallel nie ogląda się na zasoby wcale)
Kiedy parallel nie ma też sensu?
Venkat rozważył taką sytuację, filtrował listę osób po płci i wieku. Jeśli chciał użyć .findFirst() to nie miało znaczenia czy użyje sekwencyjnego streama czy parallelizowanego bo ta metoda jest domyślnie ordered. Jeśli natomiast użył .findAny() to mogą być różne wyniki a nie pierwszy jak w przypadku sekwencyjnego streama. Metoda findFirst() bardziej zużywa zasoby jeśli stream jest parallelizowany, ponieważ wątki odpytują każdy rekord w liście (sprawdzając czy spełnia warunek) a nie po kolei jak w sekwencyjnym wywołaniu. Więc złożoność czasowa, może i jest na poziomie 1 ale za to marnuje się więcej pamięci żeby to osiągnąć.
Parallelizm nie martwi się zasobami, na koszt szybszego wywołania.
Zachęcam do obejrzenia całego wykładu oraz do przeczytania streszczenia drugiej części, którą można znaleźć tutaj: Streszczenie: Parallelizm i programowanie asynchroniczne ze Stream API oraz CompletableFuture cz. 2
Jeżeli coś się nie zgadza lub jest jakiś błąd merytoryczny to proszę o kontakt w celu poprawy treści. Uczę się więc mogłem coś lekko pomylić lub czegoś nie do końca zrozumieć w kontekście tego co autor miał na myśli.