Reactive Programming - RxJava

In the second of our Reactive mini-series, Jack Card turns his attention to getting up and running with RXJava version 3.


03-02-2021
Bcorp Logo

Our instructor Jack Card talks about what Reactive means for developers not working on GUI / Web Front End applications

Reactive Programming - RxJava

Jack Card turns his attention to RxJava...

In the first instalment of our mini-series of blog pieces, we took a look at ReactiveX which represents the core reactive concepts of Observables, Observers, Subjects and Operators.  

Java's Reactive extensions allow you to manipulate multiple actions and apply functional transformations simultaneously - which when done right, can be an elegant and efficient solution to certain programmatic challenges.

The ReactiveX  API began life as a Microsoft .NET implementation called Reactive Extensions, and we'll be exploring that in a future post.

In this blog we'll be looking at getting up and running with RxJava version 3.

Using RxJava

You will need to add a dependency to your project to use the RxJava library - the exact manner you do this will be dictated by how you normally manage library dependencies.

For instance, if you are using Gradle to manage project dependencies, then you should add the following to your compile dependencies:

implementation "io.reactivex.rxjava3:rxjava:3.0.9"

...whereas, if you are using Maven, then you should add the following to your Maven dependencies:

<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxjava</artifactId>
    <version>3.0.9</version>
</dependency>

Alternatively you can add RxJava directly to your IntelliJ or Eclipse project using the dependency management tool provided (for example see the IntelliJ IDEA documentation or Eclipse documentation for more details).

Reactive Programming - RxJava


Observables in RxJava

The core class in RxJava is 

io.reactivex.rxjava3.core.Observable 

...which is a class that can be used publish or emit (these terms can be used interchangeably) data. It also supports one or more subscribers (aka Observers) registering with it. This allows any data emitted to be processed by one or more Observers.

An Observable can be created to emit data from static data (such as a list) or from dynamic sources. Observables can be chained together to control how and when data is published, to transform data before it is published and to restrict what data is actually published.

For example, to create an Observable from a list of values we can use the Observable.fromIterable() factory method. This method is used to create the new Observable object from an iterable object such as a list. This is illustrated below:

import io.reactivex.rxjava3.core.Observable;

import java.util.Arrays;
import java.util.List;

public class ReactiveApp1 {

  public static void main(String [] args) {

    List<String> list = Arrays.asList("John", "Paul", "George", "Ringo", "Pete");
    System.out.println(list);
    Observable obs = Observable.fromIterable(list);
    System.out.println("observable: $observable");
  }

}

This program first creates a plain old Java list that is used to initialise the observable. In this case the observable will emit data from the static list data source.

When this program is run the output generated is:

[John, Paul, George, Ringo, Pete]
observable: $observable

Observers in RxJava

We can add an Observer to an Observable using the subcribe() method. This method can be supplied with any one of the following:

  • a lambda function,
  • an anonymous function
  • a named function
  • an object whose class implements the Observer interface.

For example, the simplest way to create an Observer is to use a lambda function. This function will be invoked when data is emitted by the observable:

obs.sub
scribe(item -> System.out.println(item));

When the Observable (obs) emits data the lambda function will be invoked. Each data item emitted will be supplied independently to the function. The output from the above subscription for the previous Observable is:

     John
     Paul<img src="/uploads/blog_images/RXJava/Java.jpg" width="385" height="293">
     George
     Ringo
     Pete

Another version of the subscribe() method takes three parameters. These parameters are:

  • onNext which is a function to be invoked for each data item generated by the Observable. The function returns void.
  • onError which is a function to be invoked upon exceptional termination of the Observable sequence. It is supplied with the Throwable exception or error. The function returns void.
  • onCompleted which is a function to be invoked upon graceful termination of the Observable sequence. It takes no parameters and returns void.

For example:

obs.subscribe(
  item -> System.out.println(item),  // onNext
  exp -> System.out.println(exp),    // onError
  () -> System.out.println("Done")   // onComplete
);

The above code defines three lambda functions that will be called depending upon whether data is supplied by the Observable, if an error occurs or when the data stream is terminated. The output from this is:

John
Paul
George
Ringo
Pete
Done

Note that the onError function is not run as no error was generated in this example.

The final option is to use an instance of a class that implements the Observer<T> interface. The instance will receive notifications from the Observable in the just the same way as the lambda functions. However, the behaviour to be invoked will be provided by methods defined in the Observer<T> interface.  The type to be handled by the observer is given by the type T. The interface defines the following four methods:

  • void onNext(T t) Provides the Observer with a new item to observe.
  • void onError(Throwable e) Notifies the Observer that the Observable has experienced an error condition.
  • void onComplete() Notifies the Observer that the Observable has finished sending push-based notifications.
  • void onSubscribe(Disposable d) Provides the Observer with the means of cancelling (disposing) the connection (channel) with the Observable in both synchronous (from within onNext()) and asynchronous manner.

An example of a class implementing the Observer<T> interface is given below. This observer can consume Strings published by an Observable:

class MyObserver implements Observer<String> {

  public void onSubscribe(Disposable d) {
    System.out.println("Subscribed");
  }

  public void onNext(String s) {
    System.out.println("Next item " + s);
  }

  public void onError(Throwable e) {
    System.out.println("Oops - An Error " + e);
  }

  public void onComplete() {
    System.out.println("Complete");
  }
}

This MyObserver class merely prints out log information so that the behaviour of the Observer can be seen.

Instances of this class can now be used as an Observer via the subscribe() method:

obs.subscribe(new MyObserver());

The output from this example using the previous Observable is:

Subscribed
Next item John
Next item Paul
Next item George
Next item Ringo
Next item Pete
Complete

Note that while the onCompleted() method is also called, the onErrror() method is not as there were no exceptions generated.

Multiple Subscribers / Observers

An Observable can have multiple Observers subscribed to it. In this case each of the Observers is sent all of the data published by the Observable.

Multiple Observers can be registered with an Observable by calling the subscribe method multiple times.  For example, the code has three subscribes:

obs.subscribe(new MyObserver());
obs.subscribe(item -> System.out.print("func1: " + item + ", "));
System.out.println();
obs.subscribe(item -> System.out.print("func2: " + item + ", "));
System.out.println();

The output from this program is:

Subscribed
Next item John
Next item Paul
Next item George
Next item Ringo
Next item Pete
Complete
func1: John, func1: Paul, func1: George, func1: Ringo, func1: Pete, 
func2: John, func2: Paul, func2: George, func2: Ringo, func2: Pete,


Summary

RxJava lets you use functional transformations over streams of events without having to resort to Callbacks and Global State Management.


Would you like to know more?

If you found this article interesting and you're ready to take the next step, take a look at some of our courses which feature modules on Reactive Programming concepts:

Share this post on:

We would love to hear from you

Get in touch

or call us on 020 3137 3920