Streszczenie: Parallelizm i programowanie asynchroniczne ze Stream API oraz CompletableFuture cz. II
Poniższy wpis jest streszczeniem drugiej części wykładu Venkata Subramaniama z konferencji Devoxx 2018 na teamat parallelizmu i asynchronicznośći w Javie. Wykład można znaleźć pod tym linkiem: https://www.youtube.com/watch?v=0hQvWIdwnw4.
Completable Futures
- wykonania asynchroniczne
- nieblokujące
- nie czeka się na wywołanie metody ani nawet na jej start
Jak to było z Future?
Zbieranie wyników wyglądało mniej więcej tak:
Future future = call();
...performing another operations
future.get(); - you get stuck
Jak określił to Venkat – “It is not future at all”.
Co ze streamami?
Jak wygląda obsługa błędów w java Stream API? Wygląda na zasadzie: “good luck”. Programowanie funkcyjne w przypadku streamów nie za bardzo lubi się z obsługą wyjątków i dość ciężko to zrobić.
Lekcja z JavaScript
JavaScript posiadała coś takiego jak Callbacks jednak:
- występował brak spójności
- ciężko się to komponowało z innymi callbackami (callback hell)
- nie było spójności w radzeniu sobie z błędami
Potem przyszły Promises
Promises posiadają 3 stany: may resolve, reject (error), pending (decide), pierwsze 2 są niezmienne, pending może zmienić się w inny stan.
Promises (które przesyłają 0 lub 1 piece of data) posiadają 2 kanały komunikacji:
- data ——–>
- error ——-> (to jest inna forma danych, ale nadal danych, “errors are first class citiziens“)
Na końcu jest rezultat w postaci danych lub błędu.
Completable Futures to takie javascriptowe Promises na sterydach. Można je nazwać “Promises of Java world“. To jest ta sama idea, ten sam pomysł. Są bardzo podobne w działaniu.
Completable Futures też mają stages (stage = pipeline of execution), można przechodzić ze stanu do stanu itd. tylko, że każdy stage bierze CompletableFuture i zwraca CompletableFuture. Można więc je porównać (jak Venkat to zrobił) do złego charakteru w filmach który zawsze powraca, lub do młodu thora, lub do ciągle jadącego nieskończonego pociągu – nie ma sposobu na “zabicie” takiego CompletableFuture bo zawsze tworzy nowy i nowy. Takie wywołanie jest nieskończone, po prostu należy wsiąść do pociągu, zabrać swoje dane i się ulotnić.
Edit: doczytałem w innym źródle, że można zakończyć CompletableFuture albo poprzez zakończenie puli wątków albo wywołanie cancel() co jednak rzuci CancellationException dla siebie i dla zależnych transformacji.
Jak tworzyć CompletableFuture?
Należy zaimportować java.util.concurrent.*, ponieważ z takiego pakietu pochodzą. Następnie:
public static CompletableFuture create() {
return CompletableFuture.supplyAsync(() -> 2); //to akurat zwraca tylko 2
}
//i później np.
public static void main(String[] args) {
CompletableFuture future = create();
//oraz proste zbieranie daty do consumera
future.thenAccept(data -> System.out.println(data));
}
No dobrze ale powyżej tylko wydrukowaliśmy dane, nie zabraliśmy ich. Co jeśli chcielibyśmy dać kolejne .thenAccept()? Nie da się bo poprzednie wywołanie nie zwraca nam nic do zaakceptowania, zwraca void którego nie da się procesować. Można sobie tutaj co najwyżej użyć .thenRun(() -> System.out.println(“Powyżej wydrukowaliśmy dane”); i wydrukować informację. Można to nadal kontynuować i dołożyć kolejne .thenRun(), które nic nie przyjmują ani nic nie zwracają ale mogą coś wykonać (to jest odpowiednik zachowania z interfejsu funkcyjnego Runnable).
thenAccept? thenRun? Co to takiego?
CompletableFuture używa tego samego api co streamy tylko twórcy nadali tym metodom inne nazwy (dlaczego? – ja nie wiem). Należałoby zatem przypomnieć sobie kilka popularnych interfejsów funkcyjnych oraz do czego na przykład są używane w streamach:
- Supplier<T> – T get(); – factories
- Predicate<T> – boolean test(T); – filter
- Function<T, R> – R apply(T); – map
- Consumer<T> void accept(T); – forEach używa consumera
Zatem w interfejsach funkcyjnych mamy apply() -> tutaj mamy thenApply(), tam mamy accept(), tutaj thenAccept() itd.
Jak więc pobrać dane?
Musimy z nimi po prostu coś zrobić, albo przekazać dalej do procesowania, albo wydrukować, albo zmapować, da się jeszcze użyć takich metod jak .get() i .getNow(0). ale nie jest to zbytnio zalecane bo:
- .get() – to jest blokujący call i jak ma jakiś wait w środku to będzie czekało na rezultat
- .getNow(0) – jest co prawda nieblokujący ale też niecierpliwy, po prostu zamiast czekać na rezultat zwraca argument przekazany, w tym przypdaku 0
.thenAccept() – jest zatem Consumerem – zwraca CompletableFuture<Void>
.thenRun() – jest Runnable – można np użyć do wyświetlenia informacji lub zareportowaniu czegoś
W którym wątku wykonują się operacje?
To że patrzymy na ten CompletableFuture to nie znaczy, że wiemy który wątek to wykonuje. Jeśli CompletableFuture jest już completed to następne operacje wykonują się w głównym wątku (znaczy tym z którego wyszliśmy), jeśli nie jest jeszcze completed to w innym. Czekanie na rezultat nie może nam blokować wątku, trzeba kontynuować pipeline. Można sobie zmienić wywołanie z common pool na swoją własną pulę.
ForkJoinPool pool = new ForkJoinPool(10);
//i później
return CompletableFuture.supplyAsync(() -> compute(), pool);
.supplyAsync() to metoda która zwraca CompletableFuture który wykona się właśnie w ForkJoinPool o ile nie podamy swojej puli wątków.
Im laazy
Podobnie jak streamy są leniwe tak też tutaj możemy odroczyć wykonanie takiego pipeline. Najpierw tworzymy pipelnie a na końcu pushujemy do niego dane.
CompletableFuture future = new CompletableFuture();
future.thenApply(data -> data * 2)
.thenApply(data -> data + 1)
.thenAccept(data -> System.out.println(data));
//i gdzieś dalej być może w innej metodzie
future.complete(2);
//wynikiem będzie wydrukowanie liczby 5
Porównanie Streamów i CompletableFuture
Stream | CompletableFuture |
---|---|
zero, one or more data | zero or one data |
pipeline | pipeline |
only data channel | data and error channels |
lazy | lazy channel |
forEach | thenAccept |
map | thenApply (np. transformuje CF<T> -> CF<R>) |
exceptions – “oops” | error channel |
streamy nie mają czegoś takiego jak zip | combine |
flatMap | thenCompose |
function returns data – map function returns Stream – flatMap | function returns data – thenAccept/thenApply function returns CompletableFuture – thenCompose |
Radzenie sobie z wyjątkami
Jeśli wszystko jest ok to używamy przeważnie metod z then jeśli coś pójdzie źle to są to przeważnie metody oznaczone exceptionally. Np:
.exceptionally() – to nic innego jak taki “catch”
Jeśli wszystko idzie dobrze to pipeline idzie do najbliższego then a jeśli coś idzie nie tak to do najbliższego exceptionally. Ale jako, że między kanałami można przeskakiwać to można także “przywrócić się do życia” po rzuceniu wyjątku, po prostu trzeba zwrócić jakąś wartość i dać następne then po tym.
Odpowiednikiem future.complete(2);
jest future.completeExceptionally(new RuntimeException("Don't tell the boss");
Co jeśli czekamy w nieskończoność?
Powiedzmy, że nie mamy complete, completeExceptionally ani supplyAsync, wtedy stan jest cały czas pending (czeka na dane). Ale ile czasu będzie czekać na te dane?
Both in life and programming never do something without timeout
Venkat Subramaniam
Trzeba używać timeoutów. Nie było timeoutu w Java 8 ale od Javy 9 się pojawił. Były dwie metody w Javie 9: future.completeOnTimeout(0, 10, TimeUnit.SECONDS);
0 będzie przetwarzane jeśli CompletableFuture nie stanie się completed w ciągu 10 sekund. To jest pozytywna ścieżka. W negatywnej nie ma słówka exceptionally tym razem, zatem jest to onTimeout(2, TimeUnit.SECONDS);
Przekaże 0 jako argument i rzuci wyjątek bo nie posiada danych. Jeśli wcześniej będzie gdzieś .complete(2)
to te wywołania timeoutowe są pomijane.
Combine and Compose
W javie jest statyczne typowanie zatem nie możemy sobie zwrócić albo CF albo danych jak np. w javascript (jak są dane to zwraca dane, jak nie ma to promise).
Combine bierze jeden CompletableFuture i łączy z rezultatem innego CompletableFuture więc np:
private static CompletableFuture create(int number) {
return CompletableFuture.supplyAsync(() -> number);
}
//i potem
create(2).thenCombine(create(3), (result1, result2) -> result1 + result2).thenAccept(data -> System.out.println(data));
//jak taki "join" czyli zwraca 5
Compose – w streamach mamy flatMap a tutaj thenCompose(), wygląda to mniej więcej tak:
private static CompletableFuture increment(int number) {
return CompletableFuture.supplyAsync(() -> number + 1):
}
//i później
create(2)
.thenCompose(data -> increment(data))
.thenAccept(result -> System.out.println(data));
// nie można tu użyć thenApply bo increment nie zwraca danych tylko CompletableFuture
Są to podstawy głównie teoretyczne i api zmienia się z kolejnymi wyjściami wersjami Javy. Znalazłem też przydatny artykuł opisujący poszczególne metody (których jest dużo więcej: https://codecouple.pl/2018/04/06/completablefuture-dlaczego-po-co-jak/).
Zachęcam oczywiście do obejrzenia całego wykładu.
Jeżeli coś się nie zgadza lub jest jakiś błąd merytoryczny to proszę o kontakt w celu poprawy treści. Uczę się więc mogłem coś lekko pomylić lub czegoś nie do końca zrozumieć w kontekście tego co autor miał na myśli.