JavaRx_reactive_future_blog

Koska RxJava on nyt teoriassa täysin hallussa, on aika kokeilla sitä käytännössä. Graafinen käyttöliittymä tarjoaa hyvän leikkikentän käytännön tekemiseen. Tähän tarkoitukseen sopii hyvin JavaFX, jossa on muihin vaihtoehtoihin verrattuna parempi tuki asynkroniselle ohjelmoinnille. JavaFX on Javan uusin (ja jo kolmas AWT:n ja Swingin jälkeen) työpöytäkäyttöliittymäkirjasto. JavaFX:n ja RxJavan yhteenliittämiseen on myös olemassa valmis kirjasto, mielikuvituksellisesti nimetty RxJavaFX, mutta tässä esimerkissä käytetään vain RxJavaa ja JavaFX:ää, jotta pääsemme tekemään itse enemmän. Esimerkkisovelluksena kehitetään Wikipedia-selain, jolla voi hakea hakusanalla ehdotuksia suomenkielisestä Wikipediasta. Esimerkki on ryöstetty Principles of Reactive Programming -kurssilta, jossa käytettiin Swingiä ja Scalaa. Esimerkkikoodi löytyy GitHubista. Tässä keskitytään koodin reaktiivisiin osuuksiin.

Aloitetaan muussa koodissa käytettävillä apufuktioilla. Ensimmäinen osuus esittää apufunktiot Observablejen käyttöön.

public class Observables {
    
    public static <E extends Event> Observable<E> fromEvent(Node source, EventType<E> type) {
        return Observable.create(subscriber -> {
            EventHandler<E> handler = event -> {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext(event);
                }
            };
            source.addEventHandler(type, handler);
            
            subscriber.add(Subscriptions.create(() -> source.removeEventHandler(type, handler)));
        });
    }
    
    public static <T> Observable<T> fromProperty(ReadOnlyProperty<T> property) {
        return Observable.create(subscriber -> {
            ChangeListener<T> listener = (value, oldV, newV) -> {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onNext(newV);
                }
            };
            property.addListener(listener);
            
            subscriber.add(Subscriptions.create(() -> property.removeListener(listener)));
        });
    }
    
    public static <T> ReadOnlyProperty<T> toProperty(Observable<T> observable) {
        ReadOnlyObjectWrapper<T> property = new ReadOnlyObjectWrapper<>();
        observable.observeOn(javaFxEventThread())
            .subscribe(value -> property.set(value));
        
        return property.getReadOnlyProperty();
    }
    
    public static <T> ObservableList<T> toObservableList(Observable<List<T>> observable) {
        ObservableList<T> list = FXCollections.observableArrayList();
        
        observable.observeOn(javaFxEventThread())
            .subscribe(l -> list.setAll(l));
        
        return list;
    }
    
    public static <T> Observable<T> fromFuture(CompletableFuture<T> f) {
        return Observable.create(subscriber -> {
            CompletableFuture<Object> handler = f.handleAsync((v, e) -> {
                if (e == null) {
                    subscriber.onNext(v);
                    subscriber.onCompleted();
                } else {
                    subscriber.onError(e);
                }
                return null;
            });
            subscriber.add(Subscriptions.create(() -> handler.cancel(false)));
        });
    }
    
    public static <T> Observable<Optional<T>> recovered(Observable<T> obs) {
        return obs.map(Optional::of).onErrorResumeNext(e -> Observable.just(Optional.<T>empty()));
    }
    
    public static <T> Observable<T> timedOut(Observable<T> obs, long seconds) {
        return obs.takeUntil(Observable.interval(seconds, SECONDS));
    }
    
    public static <T, U> Observable<Optional<U>> concatRecovered(Observable<T> obs, Func1<T, Observable<U>> request) {
        return concat(
            obs.map(request)
                .map(Observables::recovered)
        );
    }
    
}

fromEvent, fromProperty ja fromFuture– funktiot toteuttavat tapahtumien, propertyjen ja futurejen sovittamisen Observableksi. Päinvastaiseen suuntaan on funktiot toProperty ja toObservableList, jotka muuttavat Observablet JavaFX:n käyttöön Propertyksi tai ObservableListiksi. Funktio recovered poistaa Observablesta virheet nostamalla Observablen sisällön Optionaliksi, joka ei virhetilanteessa sisällä arvoa. Tämä aiheuttaa sen, että Observable ei katkea virheen tapahtuessa. timedOut-funktio määrittää aikakatkaisulle yksinkertaisemman käytön.

Toinen osuus apufunktioita sisältää Scheduler-toteutuksen, jolla saadaan JavaFX:n tapahtumasäie käyttöön RxJavaan.

public final class Schedulers {
    
    public static Scheduler javaFxEventThread() {
        return new JavaFxScheduler();
    }
    
    final static class JavaFxScheduler extends Scheduler {

        @Override
        public Worker createWorker() {
            return new JavaFxWorker();
        }
        
    }
    
    final static class JavaFxWorker extends Worker {
        private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

        @Override
        public void unsubscribe() {
            executor.shutdown();
        }

        @Override
        public boolean isUnsubscribed() {
            return executor.isShutdown();
        }

        @Override
        public Subscription schedule(Action0 action) {
            Platform.runLater(() -> action.call());
            return Subscriptions.empty();
        }

        @Override
        public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
            ScheduledFuture<?> future = executor.schedule(() -> Platform.runLater(() -> action.call()), delayTime, unit);
            return Subscriptions.create(() -> future.cancel(false));
        }
        
    }

}

Näitä apufunktioita käytetään sovelluksen oleellisimmassa osuudessa. Seuraava koodinpätkä on sovelluksen käyttöliittymän ja Wikipedian palvelimille juttelevan osuuden yhdistävä logiikka.

        Observable<Optional<List<String>>> suggestionStream = Observables.concatRecovered(
            Observables.fromProperty(searchTerm.textProperty())
                .debounce(500, MILLISECONDS), 
            search::suggestionStream
        ).share();
        
        suggestionList.setItems(Observables.toObservableList(
            suggestionStream
                .filter(Optional::isPresent)
                .map(Optional::get)
        ));
        
        Observable<Optional<String>> pages = Observables.recovered(
            Observables.fromProperty(suggestionList.getSelectionModel().selectedItemProperty())
                .map(dirty -> dirty.replace(' ', '_'))
                .flatMap(search::pageStream)
        ).share();
        
        pages.observeOn(Schedulers.javaFxEventThread())
            .filter(Optional::isPresent)
            .map(Optional::get)
            .subscribe(p -> editor.getEngine().loadContent(p));
        
        status.textProperty().bind(Observables.toProperty(
            Observable.merge(suggestionStream, pages)
                .filter(o -> !o.isPresent())
                .map(e -> "Virhe ehdotusten haussa")
        ));

suggestionStream on Observable, joka yhdistää jokaisesta käyttäjän kirjoittamasta hakutermistä (tekstikenttään searchTerm)  tehtävästä ehdotushausta saadun Observablen yhdeksi Observable-putkeksi. Jotta näitä hakuja ei tehdä turhaan liikaa, odotetaan puoli sekuntia, että käyttäjä oikeasti lopettaa kirjoituksen, ennen kuin varsinainen haku tehdään. Tämän toiminnallisuuden toteuttaa debounce-metodi. Tästä lopputuloksena tuotetulle Observablelle kutsutaan share, jotta kun sitä myöhemmin käytetään useamman kerran, niin tehdään haut vain kertaalleen. Seuraavaksi suggestionList-listanäkymään asetetaan listaksi suggestionStreamista löytyvä uusin lista.

pages on vastaava Observable, mutta se yhdistää jokaisesta listanäkymän valinnasta tehtävän sivun haun yhdeksi Observable-putkeksi. Myös tälle kutsutaan share, jotta hakuja ei turhaan toisteta. pages-Observable liitetään web-näkymään lataamalla näkymään ladattu sivu.

Virheiden varalta sekä ehdotukset että sivut yhdistetään status-ilmoitukseen, johon tulee viesti, kun jokin toiminto on epäonnistunut.

Tällä koodilla siis saadaan kaikki toimimaan oikeissa säikeissä, (tyhmä) virheiden hallinta, viivästetty haku ja yhdistettyä Observableista mitään tietämättömät osat toisiinsa.

Seuraavassa koodinpätkässä on hakujen suoritus. Oleellinen osuus on Futuren muuttaminen Observableksi ja hakujen suorittaminen asynkronisesti Java 8:n CompletableFuturea käyttäen.

public class Search {
    ...
    
    public CompletableFuture<List<String>> suggestion(String term) {
        return CompletableFuture.supplyAsync(() -> doSuggest(term));
    }
    
    private List<String> doSuggest(String term) ...

    public CompletableFuture<String> page(String term) {
        return CompletableFuture.supplyAsync(() -> fetchPage(term));
    }
    
    private String fetchPage(String term) ...
    
    public Observable<List<String>> suggestionStream(String term) {
        return fromFuture(suggestion(term));
    }
    
    public Observable<String> pageStream(String term) {
        return fromFuture(page(term));
    }
...
}

Verrattuna Scala-versioon, jonka olin aiemmin tehnyt, Java 8:lla kehitetty versio oli yllättävänkin hyvä. Laajennusmetodit ovat Observablejen kanssa Scalassa miellyttävämpiä käyttää, koska niitä pystyy sujuvasti ketjuttamaan. Yleisesti ottaen en laajennusmetodeista pidä, mutta tässä tapauksessa koodajaan hämääminen voisi olla perusteltua. Toinen Scala-version hyöty on sujuvampi käyttöliittymän rakentaminen (Mika on kertonut tästä enemmän). Muitakin parempia puoli Scala-versiossa oli, mutta niistä olisi myös Javassa saanut parempia muutamalla kirjastolla HTTP:n ja XML:n käsittelyyn.

Tämän esimerkin koodeista voi nähdä, että olemassa olevan koodin sovittaminen Observableksi ei ole vaikeaa. Koska sen seurauksena saa vielä käyttöönsä kasapäin valmiita ominaisuuksia ja oma asynkroninen koodi muodostuu selkeämmäksi, ei ole syytä jättää reaktiivisia laajennuksia kokeilematta. Tulevaisuus on reaktiivinen.