In the second of our Reactive mini-series, Jack Card turns his attention to getting up and running with RXJava version 3.
03-02-2021
Our instructor Jack Card talks about what Reactive means for developers not working on GUI / Web Front End applications
Jack Card turns his attention to RxJava...
In the first instalment of our mini-series of blog pieces, we took a look at ReactiveX which represents the core reactive concepts of Observables, Observers, Subjects and Operators.
Java's Reactive extensions allow you to manipulate multiple actions and apply functional transformations simultaneously - which when done right, can be an elegant and efficient solution to certain programmatic challenges.
The ReactiveX API began life as a Microsoft .NET implementation called Reactive Extensions, and we'll be exploring that in a future post.
In this blog we'll be looking at getting up and running with RxJava version 3.
Using RxJava
You will need to add a dependency to your project to use the RxJavalibrary - the exact manner you do this will be dictated by how you normally manage library dependencies.
For instance, if you are using Gradle to manage project dependencies, then you should add the following to your compile dependencies:
Alternatively you can add RxJava directly to your IntelliJ or Eclipse project using the dependency management tool provided (for example see the IntelliJ IDEA documentation or Eclipse documentation for more details).
Observables in RxJava
The core class in RxJava is
io.reactivex.rxjava3.core.Observable
...which is a class that can be used publish or emit (these terms can be used interchangeably) data. It also supports one or more subscribers (aka Observers) registering with it. This allows any data emitted to be processed by one or more Observers.
An Observable can be created to emit data from static data (such as a list) or from dynamic sources. Observables can be chained together to control how and when data is published, to transform data before it is published and to restrict what data is actually published.
For example, to create an Observable from a list of values we can use the Observable.fromIterable() factory method. This method is used to create the new Observable object from an iterable object such as a list. This is illustrated below:
import io.reactivex.rxjava3.core.Observable;
import java.util.Arrays;
import java.util.List;
public class ReactiveApp1 {
public static void main(String [] args) {
List<String> list = Arrays.asList("John", "Paul", "George", "Ringo", "Pete");
System.out.println(list);
Observable obs = Observable.fromIterable(list);
System.out.println("observable: $observable");
}
}
This program first creates a plain old Java list that is used to initialise the observable. In this case the observable will emit data from the static list data source.
We can add an Observer to an Observable using the subcribe() method. This method can be supplied with any one of the following:
a lambda function,
an anonymous function
a named function
an object whose class implements the Observer interface.
For example, the simplest way to create an Observer is to use a lambda function. This function will be invoked when data is emitted by the observable:
obs.sub
scribe(item -> System.out.println(item));
When the Observable (obs) emits data the lambda function will be invoked. Each data item emitted will be supplied independently to the function. The output from the above subscription for the previous Observable is:
John
Paul<img src="/uploads/blog_images/RXJava/Java.jpg" width="385" height="293">
George
Ringo
Pete
Another version of the subscribe() method takes three parameters. These parameters are:
onNext which is a function to be invoked for each data item generated by the Observable. The function returns void.
onError which is a function to be invoked upon exceptional termination of the Observable sequence. It is supplied with the Throwable exception or error. The function returns void.
onCompleted which is a function to be invoked upon graceful termination of the Observable sequence. It takes no parameters and returns void.
The above code defines three lambda functions that will be called depending upon whether data is supplied by the Observable, if an error occurs or when the data stream is terminated. The output from this is:
John
Paul
George
Ringo
Pete
Done
Note that the onError function is not run as no error was generated in this example.
The final option is to use an instance of a class that implements the Observer<T> interface. The instance will receive notifications from the Observable in the just the same way as the lambda functions. However, the behaviour to be invoked will be provided by methods defined in the Observer<T> interface. The type to be handled by the observer is given by the type T. The interface defines the following four methods:
void onNext(T t) Provides the Observer with a new item to observe.
void onError(Throwable e) Notifies the Observer that the Observable has experienced an error condition.
void onComplete() Notifies the Observer that the Observable has finished sending push-based notifications.
void onSubscribe(Disposable d) Provides the Observer with the means of cancelling (disposing) the connection (channel) with the Observable in both synchronous (from within onNext()) and asynchronous manner.
An example of a class implementing the Observer<T> interface is given below. This observer can consume Strings published by an Observable:
class MyObserver implements Observer<String> {
public void onSubscribe(Disposable d) {
System.out.println("Subscribed");
}
public void onNext(String s) {
System.out.println("Next item " + s);
}
public void onError(Throwable e) {
System.out.println("Oops - An Error " + e);
}
public void onComplete() {
System.out.println("Complete");
}
}
This MyObserver class merely prints out log information so that the behaviour of the Observer can be seen.
Instances of this class can now be used as an Observer via the subscribe() method:
obs.subscribe(new MyObserver());
The output from this example using the previous Observable is:
Subscribed
Next item John
Next item Paul
Next item George
Next item Ringo
Next item Pete
Complete
Note that while the onCompleted() method is also called, the onErrror() method is not as there were no exceptions generated.
Multiple Subscribers / Observers
An Observable can have multiple Observers subscribed to it. In this case each of the Observers is sent all of the data published by the Observable.
Multiple Observers can be registered with an Observable by calling the subscribe method multiple times. For example, the code has three subscribes:
Subscribed
Next item John
Next item Paul
Next item George
Next item Ringo
Next item Pete
Complete
func1: John, func1: Paul, func1: George, func1: Ringo, func1: Pete,
func2: John, func2: Paul, func2: George, func2: Ringo, func2: Pete,
Summary
RxJava lets you use functional transformations over streams of events without having to resort to Callbacks and Global State Management.
Would you like to know more?
If you found this article interesting and you're ready to take the next step, take a look at some of our courses which feature modules on Reactive Programming concepts:
We use cookies on our website to provide you with the best user experience. If you're happy with this please continue to use the site as normal. For more information please see our Privacy Policy.