Marmorikuulat-marble-glass

Reaktiivisista laajennuksista on Javassa useampia kirjastototeutuksia. Reactive Streams sisältää rajapinnat, jotka on tarkoitettu erilaisten reaktiivisten kirjastojen yhteistoimintaan. ReactFX on reaktiivisten laajennusten toteutus JavaFX-käyttöliittymäkirjastolle. Ehkä merkittävin on kuitenkin Netflixiltä lähtöisin oleva RxJava, joka perustuu Microsoftin alkuperäiseen Rx.NET-kirjastoon. RxJava on myös se kirjasto, mihin tämä kirjoitus keskittyy. Tässä esitellään RxJavan ohjelmointirajapintaa ja sen peruskäyttöä. Esimerkkinä käytetään JavaBean-määrittelyn mukaisen luokan sovittamista Rx:ään.

RxJavassa tarkkailtava on Observable ja oleellisilta osiltaan se näyttää tältä:

public class Observable<T> {
  public static <T> Observable<T> create(Action1<Subscriber<T>> onSubscribe) ...
  ...
  public Subscription subscribe(Action1<T> onNext) ...
  public Subscription subscribe(Action1<T> onNext, Action1<Throwable> onError, Action0 onComplete) ...
  public Subscription subscribe(Observer<T> observer) ...
  public Subscription subscribe(Subscriber<T> observer) ...
  ...
}

Useamman metodin parametrina käyttämä Action1 on RxJavan yksimetodinen rajapinta (@FunctionalInterface Java 8:ssa) toiminnolle, joka ottaa yhden parametrin, eikä palauta mitään. Action0 on vastaava rajapinta, joka ei ota yhtään parametria.

Tyypillisin tapa luoda oma Observable on käyttää create-metodia. Se ottaa parametrinaan toiminnon, joka suoritetaan, kun joku rekisteröityy kuuntelemaan luotua Observablea. Esimerkiksi tavallisesta JavaBeanista voisi luoda yksinkertaisen Observablen, joka tuottaa paikkatietoa Point2D-muodossa, näin:

Bean b = ...
Observable<Point2D> obs = Observable.create(subscriber -> {
  b.addPropertyChangeListener(Bean.PROP_POSITION, event -> subscriber.onNext((Point2D) event.getNewValue()));
});

Tässä rekisteröinnissä suoritettavaksi toiminnoksi annetaan lambda (jonka kääntäjä muuttaa Action1-rajapinnan toteutukseksi), jossa rekisteröidään kuuntelija JavaBean-määrittelyn mukaiselle Bean-oliolle. Tässä kuuntelijassa annetaan ominaisuuden uusi arvo eteenpäin Observablelle rekisteröityneelle subscriberille. Tässä esimerkissä ei vielä oteta huomioon, että subscriber voisi joskus lopettaa kuuntelun.

Observablen subscribe-metodit rekisteröivät tarkkailijan tarkkailtavalle vähän eri muodoissa. Yksinkertaisin tapaus, jossa ei välitetä virheistä, on käyttää ensimmäistä metodia ja antaa Action1-rajapinnan toteutukseksi lambda. Esimerkiksi Observablen kaikkien uusien arvojen kirjoittaminen stdoutiin onnistuisi seuraavasti:

Observable<Point2D> obs = ...
obs.subscribe(o -> System.out.println(o));

subscribelle parametrina annettu lambda suoritetaan aina, kun Observable tuottaa uuden arvon.

JavaBeanin tapauksessa onNext-toteutus voi olla riittävä, mutta yleensä virheet ovat mahdollisia ja tarkkailtava virta voi myös loppua ohjelman ajon aikana. Näitä tapauksia varten tarvitaan joko seuraavaa subscribe-metodia, jolle voi antaa kolme lambdaa tai sitten on toteuttava oikea tarkkailijarajapinta Observer. Tämä rajapinta näyttää puoletaan tältä:

public interface Observer<T> {
  void onNext(T t);
  void onError(Throwable t);
  void onCompleted();
}

Observerin onError– ja onCompleted-metodit ovat reaktiivisten laajennusten lisäykset perinteiseen tarkkailijaan. Ne molemmat päättävät sekvenssin; kun jompaakumpaa on kutsuttu, mitään muuta Observerin metodia ei enää kutsuta.
Kaikki Observablen subscribe-metodit palauttavat Subscription-olion, jota metodin kutsuja voi käyttää lopettaakseen tarkkailtavan tarkkailun. Subscription-rajapinta näyttää seuraavalta:

public interface Subscription {
  void unsubscribe();
  boolean isUnsubscribed();
}

unsubscribe-metodilla voi lopettaa Observablen kuuntelun. isUnsubscribed-metodilla taas voidaan tarkastaa, onko kuuntelu lopetettu.

Edelliset rajapinnat (Observer ja Subscription) yhdistämällä saadaan Subscriber, joka on erityisen hyödyllinen Observablen luonnissa (ks. create-metodin tyyppiallekirjoitus). Sen ansiosta siellä ei ole tarvetta käsitellä erikseen Observer– ja Subscription-olioita. Subscriberin määrittely näyttää seuraavalta:

public abstract class Subscriber<T> implements Observer<T>, Subscription {
...
  public void add(Subscription s) ... 
}

add-metodilla voi rekisteröidä Subscriberille toisen Subscriptionin, jonka tilaa synkronoidaan Subscriberin tilan (unsubscribed) kanssa.

Jos JavaBean-Observableen nyt lisää Subcriptionin käsittelyn, se voisi näyttää tältä:

Bean b = ...
Observable<Point2D> obs = Observable.create(subscriber -> {
  PropertyChangeListener listener = event -> {
    // Ei lähetetä turhia arvoja eteenpäin
    if (!subscriber.isUnsubscribed()) {
      subscriber.onNext((Point2D) event.getNewValue());
    }
  };
  b.addPropertyChangeListener(Bean.PROP_POSITION, listener);

  // Rekisteröidään toiminto, jolla poistetaan kuuntelija, kun subscriber poistuu
  subscriber.add(Subscriptions.create(() -> b.removePropertyChangeListener(Bean.PROP_POSITION, listener));
});

Verrattuna yksinkertaiseen versioon, tässä versiossa tarkastetaan ennen uusien arvojen lähetystä, onko Observablella enää kuuntelijaa. Lisäksi subscriberiin rekisteröidään uusi Subscription, jossa on toiminto kuuntelijan poistamiseksi, kun Subscription-oliolle kutsutaan unsubscribe (Subscriptions.create luo uuden Subscription-olion, jolle annettu lambda ajetaan, kun sille kutsutaan unsubscribe).

Näillä resepteillä pystyy luomaan omia Observableja ja tarkkailemaan niitä, mutta siinä ei sinällään ole mitään uutta. RxJavan hyödyt tulevat esiin, kun näille halutaan tehdä operaatioita. Oikeassa Observable-luokassa on niin suuri määrä metodeja, että pelkästään niiden allekirjoitukset ovat liikaa tähän kirjoitukseen. Seuraavassa muutaman operaattorin määrittelyt esimerkkiä varten:

public class Observable<T> {
  public Observable<T> filter(Func1<? super T, Boolean> predicate) ...
  public Observable<T> mergeWith(Observable<? extends T> t1) ...
  public <R> Observable<R> map(Func1<? super T, ? extends R> func) ...
  public Observable<T> throttleLast(long intervalDuration, TimeUnit unit) ...
}

Näistä filter ja map vastaavat Java 8:n Stream-rajapinnan (ja yleisemmin funktionaalisia listaoperaatioita) metodeita. filter tuottaa uuden Observablen, joka sisältää vain ne arvot, jotka täyttävät sille annetun ehdon. map tuottaa uuden Observablen, jonka arvot on tuotettu Observablen arvoista func-funktiolla. mergeWith-metodilla ei ole suoraan verrokkia, sillä ajalla on sen lopputulokseen merkittävä vaikutus. mergeWith yhdistää kaksi Observablea yhdeksi siten, että sen seuraava arvo tulee siitä lähde-Observablesta, joka ehtii tuottaa uuden arvon. Observable tuottaa virheen tai päättymisen heti kun kumpi tahansa sen lähde-Observableista tuottaa sen. throttleLast kuristaa Observablea siten, että se tuottaa vain aikaikkunan viimeisimmän arvon.

Esimerkiksi, jos haluttaisiin yhdistää kaksi JavaBean-muotoista paikkatietolähdettä, suodattaa niistä oikeat arvot, kuristaa se päivittymään vain kerran 30 sekunnissa ja tulostaa paikka lopulta N/E-koordinaattimuodossa, sen voisi tehdä Observablena näiden operaattoreiden avulla seuraavasti:

Observable<Point2D> obs1 = ...
Observable<Point2D> obs2 = ...

obs1.mergeWith(obs2)
  .filter(o -> o != null)
  .throttleLast(30, TimeUnit.SECONDS) 
  .map(p -> "N " + p.getX() + " E " + p.getY())
  .subscribe(s -> System.out.println(s));

Saman toteuttaminen puhtaasti JavaBeanin keinoin olisi suhteellisen hankalaa, lähinnä kuristaminen vaatisi paljon virhealtista työtä. Juuri tällaiset valmiit ajalliset operaattorit ovat Rx:n merkittävimpiä etuja.

Seuraavassa osassa kehitetään RxJavalla pieni esimerkkisovellus ja tutustutaan sitä kautta paremmin joihinkin mielenkiintoisempiin RxJavan kymmenistä operaattoreista.
lukija.onCompleted()