Browse Category: Wielowątkowość

Streszczenie: Parallelizm i programowanie asynchroniczne ze Stream API oraz CompletableFuture cz. II

Poniższy wpis jest streszczeniem drugiej części wykładu Venkata Subramaniama z konferencji Devoxx 2018 na teamat parallelizmu i asynchronicznośći w Javie. Wykład można znaleźć pod tym linkiem: https://www.youtube.com/watch?v=0hQvWIdwnw4.

Completable Futures

  • wykonania asynchroniczne
  • nieblokujące
  • nie czeka się na wywołanie metody ani nawet na jej start

Jak to było z Future?

Zbieranie wyników wyglądało mniej więcej tak:

Future future = call();
...performing another operations
future.get(); - you get stuck

Jak określił to Venkat – “It is not future at all”.

Co ze streamami?

Jak wygląda obsługa błędów w java Stream API? Wygląda na zasadzie: “good luck”. Programowanie funkcyjne w przypadku streamów nie za bardzo lubi się z obsługą wyjątków i dość ciężko to zrobić.

Lekcja z JavaScript

JavaScript posiadała coś takiego jak Callbacks jednak:

  • występował brak spójności
  • ciężko się to komponowało z innymi callbackami (callback hell)
  • nie było spójności w radzeniu sobie z błędami

Potem przyszły Promises

Promises posiadają 3 stany: may resolve, reject (error), pending (decide), pierwsze 2 są niezmienne, pending może zmienić się w inny stan.

Promises (które przesyłają 0 lub 1 piece of data) posiadają 2 kanały komunikacji:

  • data ——–>
  • error ——-> (to jest inna forma danych, ale nadal danych, “errors are first class citiziens“)

Na końcu jest rezultat w postaci danych lub błędu.

Completable Futures to takie javascriptowe Promises na sterydach. Można je nazwać “Promises of Java world“. To jest ta sama idea, ten sam pomysł. Są bardzo podobne w działaniu.

Completable Futures też mają stages (stage = pipeline of execution), można przechodzić ze stanu do stanu itd. tylko, że każdy stage bierze CompletableFuture i zwraca CompletableFuture. Można więc je porównać (jak Venkat to zrobił) do złego charakteru w filmach który zawsze powraca, lub do młodu thora, lub do ciągle jadącego nieskończonego pociągu – nie ma sposobu na “zabicie” takiego CompletableFuture bo zawsze tworzy nowy i nowy. Takie wywołanie jest nieskończone, po prostu należy wsiąść do pociągu, zabrać swoje dane i się ulotnić.

Edit: doczytałem w innym źródle, że można zakończyć CompletableFuture albo poprzez zakończenie puli wątków albo wywołanie cancel() co jednak rzuci CancellationException dla siebie i dla zależnych transformacji.

Jak tworzyć CompletableFuture?

Należy zaimportować java.util.concurrent.*, ponieważ z takiego pakietu pochodzą. Następnie:

public static CompletableFuture create() {
    return CompletableFuture.supplyAsync(() -> 2); //to akurat zwraca tylko 2
}
//i później np.
public static void main(String[] args) {
    CompletableFuture future = create();
    //oraz proste zbieranie daty do consumera
    future.thenAccept(data -> System.out.println(data));
}

No dobrze ale powyżej tylko wydrukowaliśmy dane, nie zabraliśmy ich. Co jeśli chcielibyśmy dać kolejne .thenAccept()? Nie da się bo poprzednie wywołanie nie zwraca nam nic do zaakceptowania, zwraca void którego nie da się procesować. Można sobie tutaj co najwyżej użyć .thenRun(() -> System.out.println(“Powyżej wydrukowaliśmy dane”); i wydrukować informację. Można to nadal kontynuować i dołożyć kolejne .thenRun(), które nic nie przyjmują ani nic nie zwracają ale mogą coś wykonać (to jest odpowiednik zachowania z interfejsu funkcyjnego Runnable).

thenAccept? thenRun? Co to takiego?

CompletableFuture używa tego samego api co streamy tylko twórcy nadali tym metodom inne nazwy (dlaczego? – ja nie wiem). Należałoby zatem przypomnieć sobie kilka popularnych interfejsów funkcyjnych oraz do czego na przykład są używane w streamach:

  • Supplier<T> – T get(); – factories
  • Predicate<T> – boolean test(T); – filter
  • Function<T, R> – R apply(T); – map
  • Consumer<T> void accept(T); – forEach używa consumera

Zatem w interfejsach funkcyjnych mamy apply() -> tutaj mamy thenApply(), tam mamy accept(), tutaj thenAccept() itd.

Jak więc pobrać dane?

Musimy z nimi po prostu coś zrobić, albo przekazać dalej do procesowania, albo wydrukować, albo zmapować, da się jeszcze użyć takich metod jak .get() i .getNow(0). ale nie jest to zbytnio zalecane bo:

  • .get() – to jest blokujący call i jak ma jakiś wait w środku to będzie czekało na rezultat
  • .getNow(0) – jest co prawda nieblokujący ale też niecierpliwy, po prostu zamiast czekać na rezultat zwraca argument przekazany, w tym przypdaku 0

.thenAccept() – jest zatem Consumerem – zwraca CompletableFuture<Void>

.thenRun() – jest Runnable – można np użyć do wyświetlenia informacji lub zareportowaniu czegoś

W którym wątku wykonują się operacje?

To że patrzymy na ten CompletableFuture to nie znaczy, że wiemy który wątek to wykonuje. Jeśli CompletableFuture jest już completed to następne operacje wykonują się w głównym wątku (znaczy tym z którego wyszliśmy), jeśli nie jest jeszcze completed to w innym. Czekanie na rezultat nie może nam blokować wątku, trzeba kontynuować pipeline. Można sobie zmienić wywołanie z common pool na swoją własną pulę.

ForkJoinPool pool = new ForkJoinPool(10);
//i później
return CompletableFuture.supplyAsync(() -> compute(), pool);

.supplyAsync() to metoda która zwraca CompletableFuture który wykona się właśnie w ForkJoinPool o ile nie podamy swojej puli wątków.

Im laazy

Podobnie jak streamy są leniwe tak też tutaj możemy odroczyć wykonanie takiego pipeline. Najpierw tworzymy pipelnie a na końcu pushujemy do niego dane.

CompletableFuture future = new CompletableFuture();
future.thenApply(data -> data * 2)
            .thenApply(data -> data + 1)
            .thenAccept(data -> System.out.println(data));
//i gdzieś dalej być może w innej metodzie
future.complete(2);
//wynikiem będzie wydrukowanie liczby 5

Porównanie Streamów i CompletableFuture

StreamCompletableFuture
zero, one or more datazero or one data
pipelinepipeline
only data channeldata and error channels
lazylazy channel
forEachthenAccept
mapthenApply (np. transformuje CF<T> -> CF<R>)
exceptions – “oops”error channel
streamy nie mają czegoś takiego jak zipcombine
flatMapthenCompose
function returns data – map
function returns Stream – flatMap
function returns data – thenAccept/thenApply
function returns CompletableFuture – thenCompose

Radzenie sobie z wyjątkami

Jeśli wszystko jest ok to używamy przeważnie metod z then jeśli coś pójdzie źle to są to przeważnie metody oznaczone exceptionally. Np:

.exceptionally() – to nic innego jak taki “catch”

Jeśli wszystko idzie dobrze to pipeline idzie do najbliższego then a jeśli coś idzie nie tak to do najbliższego exceptionally. Ale jako, że między kanałami można przeskakiwać to można także “przywrócić się do życia” po rzuceniu wyjątku, po prostu trzeba zwrócić jakąś wartość i dać następne then po tym.

Odpowiednikiem future.complete(2); jest future.completeExceptionally(new RuntimeException("Don't tell the boss");

Co jeśli czekamy w nieskończoność?

Powiedzmy, że nie mamy complete, completeExceptionally ani supplyAsync, wtedy stan jest cały czas pending (czeka na dane). Ale ile czasu będzie czekać na te dane?

Both in life and programming never do something without timeout

Venkat Subramaniam

Trzeba używać timeoutów. Nie było timeoutu w Java 8 ale od Javy 9 się pojawił. Były dwie metody w Javie 9: future.completeOnTimeout(0, 10, TimeUnit.SECONDS); 0 będzie przetwarzane jeśli CompletableFuture nie stanie się completed w ciągu 10 sekund. To jest pozytywna ścieżka. W negatywnej nie ma słówka exceptionally tym razem, zatem jest to onTimeout(2, TimeUnit.SECONDS); Przekaże 0 jako argument i rzuci wyjątek bo nie posiada danych. Jeśli wcześniej będzie gdzieś .complete(2) to te wywołania timeoutowe są pomijane.

Combine and Compose

W javie jest statyczne typowanie zatem nie możemy sobie zwrócić albo CF albo danych jak np. w javascript (jak są dane to zwraca dane, jak nie ma to promise).

Combine bierze jeden CompletableFuture i łączy z rezultatem innego CompletableFuture więc np:

private static CompletableFuture create(int number) {
    return CompletableFuture.supplyAsync(() -> number); 
}
//i potem 
create(2).thenCombine(create(3), (result1, result2) -> result1 + result2).thenAccept(data -> System.out.println(data)); 
//jak taki "join" czyli zwraca 5

Compose – w streamach mamy flatMap a tutaj thenCompose(), wygląda to mniej więcej tak:

private static CompletableFuture increment(int number) {
    return CompletableFuture.supplyAsync(() -> number + 1):
}
//i później
create(2)
    .thenCompose(data -> increment(data))
    .thenAccept(result -> System.out.println(data));
// nie można tu użyć thenApply bo increment nie zwraca danych tylko CompletableFuture

Są to podstawy głównie teoretyczne i api zmienia się z kolejnymi wyjściami wersjami Javy. Znalazłem też przydatny artykuł opisujący poszczególne metody (których jest dużo więcej: https://codecouple.pl/2018/04/06/completablefuture-dlaczego-po-co-jak/).

Zachęcam oczywiście do obejrzenia całego wykładu.

Jeżeli coś się nie zgadza lub jest jakiś błąd merytoryczny to proszę o kontakt w celu poprawy treści. Uczę się więc mogłem coś lekko pomylić lub czegoś nie do końca zrozumieć w kontekście tego co autor miał na myśli.

Streszczenie: Parallelizm i programowanie asynchroniczne ze Stream API oraz CompletableFuture cz. I

Poniższy wpis jest streszczeniem wykładu Venkata Subramaniama z konferencji Devoxx 2018 na teamat parallelizmu i asynchronicznośći w Javie. Wykład można znaleźć pod tym linkiem: https://www.youtube.com/watch?v=0hQvWIdwnw4.

Cały wykład, czyli cześć pierwsza i druga trwają 3h14minut więc postaram się napisać co udało mi się zapamiętać i zrozumieć z tej lekcji. Venkat to świetny prelegent, który sprawia, że dość ciężkie zagadnienia “wchodzą” z łatwością do głowy.

Pierwsza część obejmuje parallelizm

Oba pojęcia czyli parallel i programowanie asynchroniczne różnią się od siebie, nie jest to to samo.

Parallelizacja – rozdzielamy jakieś zadania na mniejsze (fork), dzieląc je między dostępne wątki, na końcu łączymy (join) i zbieramy rezultat

Asynchronicznośc – operacje wykonywane są asynchronicznie, nie trzeba czekać na dostarczenie wszystkich wyników operacji z każdego wątku, główna operacja (praca) nadal trwa

Venkat porównał to do imprezy. Parallel to kiedy mówimy jednej osobie, żeby przyniosła pizzę a drugiej aby przyniosła napoje, rozdzielamy więc pracę i kiedy obie osoby przyniosą swoje składniki rozpoczynamy zabawę. Asynchroniczność to kiedy mówimy jednej osobie żeby przyniosła pizzę a drugiej aby przyniosła napoje z tym, że kiedy nie ma obu osób możemy już się bawić i tańczyć, kiedy pierwsza osoba przyniesie pizzę, jemy pizzę i dalej się bawimy, następnie druga osoba przynosi napoje, cały czas trwa impreza. Nie musimy czekać na każdy składnik aby ją zacząć.

Luźne notatki:

Streams - addiction
Collection Pipeline Pattern
//functional style - functional composition
stream() - internal iterator (tak to można nazwać)
Functional composition -> Collection Pipeline
//imperative style has accidental complexity
//functional style has less complexity and it is also easier to parallelize

Parallelizowany stream można utworzyć na dwa sposoby:

  • parallelStream() – kiedy jesteśmy kreatorem streama
  • stream().parallel() – kiedy nie jesteśmy kreatorem tylko np. dostajemy sekwencyjny stream jako wynik jakiejś funkcji, argument itp.

Venkat podkreślił, że kod sekwencyjny ma bardzo różniącą się strukturę od kodu wielowątkowego. Kiedy używamy synchronized, bardzo nadruszamy sobie strukturę kodu, do tego trzeba zapewne łapać wyjątki związane z wątkami. Kiedy używamy streamów struktura sekwencyjnego kodu jest identyczna ze strukturą kodu wielowątkowego.

  • sequential() – niweluje parallelizm w streamie

Jeśli mamy oba zarówno parallel() jak i sequential() w streamie to zastosowana jest zasada last wins, ostatnim jest ten najbliżej operacji terminalnej.

W kolejnych “slajdach” Venkat pokazał różnice miedzy Streamami a Streamami Reaktywnymi.

We solve one set of problems only to create a new set of problems

Tak o kolejnych generacjach wielowątkowości mówił Venkat Subramaniam
  • Java 1 – Threads – wiemy jak wygląda praca z wątkami na najniższym poziomie i jak wygląda kod
  • Java 5 – ExcecutorServices – duże uproszczenie, wrzucanie zadań do worka i czekanie na odpowiedź, z tym że mogły wywoływać Pool deadlock czyli sytuację, że za duże żądania cały czas dzielone na mniejsze przez wątki executora mogły wywoływać deadlock, wątki po podzieleniu zadania przekazywały zadanie dalej i czekały na kolejne zadanie do podziału, tylko, że czasami mogło zabraknąć wątków które mogłby wykonywać pracę tych podzielonych części i następował deadlock
  • Java 7 – ForkJoinPool – wątki dzielą probemy ale już nie czekają bez końca na kolejne dzielenie zadań tylko wykonują pracę innego wątku, nie marnuje się wątku tylko na czekanie, jest to bardziej efektywne wykorzystanie czasu procesora, jest to tak zwany work stealing – podbieranie pracy innego wątku kiedy nasz wątek za długo jest bezczynny

Jak wiadomo parallel nie zachowuje kolejności wywołań.

Niektóre metody są właściwie uporządkowane (inherently ordered) czyli muszą mieć zachowaną jakąś kolejność działań. Inne natomiast mogą nie być uporządkowane lecz mieć uporządkowane odpowiedniki (ordered counterpart). Aby zachować i wielowątkowość w streamie i kolejność wywołań, np. odczyt można użyć .forEachOrdered() jako operację terminalną na parallel streamie. Nadal jest to wielowątkowe wywołanie, bo zadania wykonują się równolegle jednak muszą poczekać z dokończeniem pracy na poprzednika. Metoda .forEachOrdered() gwarantuje uporządkowanie tylko jeśli przekazany miał już wcześniej możliwość bycia uporządkowanym np. tak jak Lista. Na Secie nie zadziała bo ta metoda nie wyczaruje tego uporządkowania, ona ma je tylko zachować.

  • Parallel and filter (OK)
  • Parallel and map (OK)
  • Parallel and reduce (BARDZO NIE OK), to znaczy jeśli seed czyli jak Venkat mówił NIE initial value a Identity Value (czyli wartość przekazana do operacji nie zmieniająca wyniku operacji np. w mnożeniu to liczba 1: x * 1 = x, w dodawaniu 0: x + 0 = x) to wtedy się wszystko zgadza i jest OK ale jeśli nie jest Identity to wyniki bardzo się rozjeżdżają bo każdy wątek bierze sobie tą wartość początkową i nie patrzy na to co robią inne wątki, zatem jeśli w streamie sumującym ustawimy wartość początkową na np. 30 to każdy wątek na początku sobie weźmie 30 i doda to otrzymanego wyniku

How many threads can I create? //bad question

How many threads should I create? //good question

Venkat Subramaniam

Computation Intensive vs IO Intensive

Jeżeli wątków będzie za dużo na za małą liczbę rdzeni procesora i na za małą liczbę zadań to może się okazać, że wątki będą oddawały cały czas swój czas procesora na rzecz innego wątku zamiast wykonywać swoje zadanie. Może to być bardzo nieoptymalne. W przypadku jeśli chodzi o Computation problem (czyli złożony problem obliczeniowy) wątków powinno być: liczba wątków <= liczba rdzeni procesora. W przypadku operacji IO dany jest następujący wzór: liczba wątków <= liczba rdzeni procesora / 1 – blocking factor. Blocking factor to liczba z zakresu 0 <= blocking factor < 1. W przypadku kiedy będzie 1 mamy deadlock. Więc w parallel streamie który działa w oparciu o ForkJoinPool można sobie zwiększyć liczbę wątków w CommonPool ale nie jest to zalecane bo może zmniejszyć wydajność. Rozmiar CommonPoola można ustawiać flagą:

-Djava.util.concurrent.ForkJoinPool.common.parallelism=100

wtedy wątki w parallel streamie będą się “batchować” po 100, normalnie batchują sie po standardowym rozmiarze tego common poola + główny wątek (czyli najpewniej łącznie tyle ile jest rdzeni procesora). Można też to zrobić programowo:

ForkJoinPool pool = new ForkJoinPool(100);
pool.submit(() -> stream.forEach(e -> {}));
pool.shutdown();
pool.awaitTermination(10, SEC); //tego nie musi być

Więc jak widać stream wykonuje się nie w tej puli wątków gdzie został stworzony a tam gdzie została wywołana operacja terminalna. Więc można wrzucić taki .parallelStream() do takiego ForkJoinPoola i zmusić programowo do zwiększenia liczby wątków. CO NIE ZNACZY, ŻE BĘDZIE SZYBCIEJ! WRĘCZ ODWROTNIE!

Jak zdecydować kiedy użyć parallel?

  • Jak jest dużo obiektów – ma to sens, jak jest mało obiektów – nie ma sensu
  • Jak coś się bardzo szybko wywołuje to też nie ma sensu bo nic to nie da, tylko będzie marnowało zasoby (parallel nie ogląda się na zasoby wcale)

Kiedy parallel nie ma też sensu?

Venkat rozważył taką sytuację, filtrował listę osób po płci i wieku. Jeśli chciał użyć .findFirst() to nie miało znaczenia czy użyje sekwencyjnego streama czy parallelizowanego bo ta metoda jest domyślnie ordered. Jeśli natomiast użył .findAny() to mogą być różne wyniki a nie pierwszy jak w przypadku sekwencyjnego streama. Metoda findFirst() bardziej zużywa zasoby jeśli stream jest parallelizowany, ponieważ wątki odpytują każdy rekord w liście (sprawdzając czy spełnia warunek) a nie po kolei jak w sekwencyjnym wywołaniu. Więc złożoność czasowa, może i jest na poziomie 1 ale za to marnuje się więcej pamięci żeby to osiągnąć.

Parallelizm nie martwi się zasobami, na koszt szybszego wywołania.

Zachęcam do obejrzenia całego wykładu oraz do przeczytania streszczenia drugiej części, którą można znaleźć tutaj: Streszczenie: Parallelizm i programowanie asynchroniczne ze Stream API oraz CompletableFuture cz. 2

Jeżeli coś się nie zgadza lub jest jakiś błąd merytoryczny to proszę o kontakt w celu poprawy treści. Uczę się więc mogłem coś lekko pomylić lub czegoś nie do końca zrozumieć w kontekście tego co autor miał na myśli.


Close Bitnami banner
Bitnami