Reactive Kotlin: RxKotlin

In the third in our Reactive mini-series, Jack Card looks at the RxKotlin library. 

15-03-2021
Bcorp Logo
Reactive Kotlin: RxKotlin

Jack Card looks at the RxKotlin library...

Building on our introduction to reactive systems and Reactive Java, in this blog we will look at the RxKotlin library. This is a set of extensions to the RxJava library that allow for idiomatic Kotlin to be written. 

Both RxKotlin and RxJava are part of the wider ReactiveX (Reactive Extensions) project. 

Using RxKotlin

As with RxJava you will need to add a dependency to your project for RxKotlin. For example, if you use Gradle then you will need to add the RxKotlin library to your compile dependencies in your build.gradle file:

implementation "io.reactivex.rxjava3:rxkotlin:3.0.1"

Whereas if you are a Maven user you can add the same dependency to your pom.xml file using the following:

<dependency>
    <groupId>io.reactivex.rxjava3</groupId>
    <artifactId>rxkotlin</artifactId>
    <version>3.0.1</version>
    <scope>runtime</scope>
</dependency>


Why have a separate Kotlin library?

If you are familiar with Kotlin you may ask ‘why have a separate RxKotlin library at all?’. 

After all, Kotlin can directly access any Java libraries ...and certainly Kotlin-to-Java invocations are straight forward! However, the key to this are Kotlin Extensions, which are a facility that allow a developer to extend an existing type without directly modifying that type or inheriting from that type. 

These extensions can be scoped so that they are only available when required.

Kotlin extensions allow features to be added to the underlying RxJava library, and allow a style of programming which is more in keeping with Kotlin. For example, in RxJava, to create an Observable from a static list of data we might write:

List<String> list = 
         Arrays.asList("John", "Paul", "George", "Ringo", "Pete");
Observable obs = Observable.fromIterable(list);

...while in Kotlin we can write:

val observable = 
         listOf("John", "Paul", "George", "Ringo", "Pete")
             .toObservable()


Key RxKotlin Classes

There are several core classes that comprise the RxJava/RxKotlin framework, however the two key classes are:

  • io.reactivex.rxjava3.core.Observable:
    supports zero or more observables but does not support backpressure.
  • io.reactivex.rxjava3.core.Flowable:
    supports zero or more observables and also supports the concept of backpressure (discussed later in this blog).


Observables

Observables are the simplest approach and have already been discussed in the previous two blogs. However, for reference here is an example of using a simple observable created from a static list of values:

import io.reactivex.rxjava3.kotlin.subscribeBy
import io.reactivex.rxjava3.kotlin.toObservable

fun main() {
    // Create an observable using data in a list
    val observable = listOf(0, 1, 2, 3, 5, 7).toObservable()

    // Subscribe a simple lambda function
    observable.subscribeBy { print("$it, ") }
}

The program creates an observable from a list of prime numbers and then emits (or publishes) these data items to the Observable data stream. A single subscriber is registered with the observable and will receive each data item in turn. The data item will be bound to the implicit parameter it, supplied to the lambda function, that has been defined using the Kotlin trailing lambda syntax used with the subscribeBy{} member function (method).

The output from this program is:

0, 1, 2, 3, 5, 7,

We can also create a dynamic version of this program which will publish data generated on the fly rather than hard coded within a list. This can be done using the Observable.create{} factory function.

The Observable.create<T>{} takes a function that will be executed to create the Observable. This function can be a lambda function (which is the most commonly used approach), an anonymous function or a named function accessed via a callable reference.

The function passed to Observable.create<T>{} is passed an ObservableEmitter instance. This can be used to emit (or publish) data to the data stream generated for the Observable. 

There are a range of member functions available on the ObservableEmitter including:

  • onNext(T value):
    Unit This emits a new value to the data stream associated with the Observable.
  • onComplete():Unit
    This indicates that the Observable will not send any more data. In RxKotlin terminology this signals a completion.
  • onError(Throwable error): Unit
    This indicates that an exception has occurred in the observable. The subscribers will thus have their onError behaviour invoked.

Note that the onNext(), onError() and onComplete() member functions should always be called synchronously thus care should be taken if they are to be called from separate threads.

An example is given below of using Observable.create<T>{} to create a prime number generator observable that dynamically generates prime numbers up to a maximum value:

import io.reactivex.rxjava3.core.Observable

fun checkIsPrimeNumber(num: Int): Boolean {
    for (i in 2..num / 2) {
        if (num % i == 0) return false
    }
    return true
}

fun generatePrimeNumberObservable(maxNumber: Int): 
                                           Observable<Int> =
    Observable.create<Int> { emitter ->
        var currentNumber = 0
        while (currentNumber < maxNumber) {
            if (checkIsPrimeNumber(currentNumber))
                emitter.onNext(currentNumber)
            ++currentNumber
        }
        emitter.onComplete()
    }

fun main() {

    generatePrimeNumberOberservable(11)
        .subscribe{print("$it, ")}
    println()

}

This generatePrimeNumberOberservable() function uses the Observable.create<Int>{} function to create a lambda function that will emit any prime numbers it calculates up to a supplied maximum number. 

The result of the Observable.create<Int>{} is then returned from the generatePrimeNumberOberservable() function. In the main function a subscriber is registered with the Observable return that will print out each of the numbers supplied to it.

The output from this program is:

0, 1, 2, 3, 5, 7, 


Flowables

A Flowable is a data publisher that emits its data according to the demand received from its Subscribers. This means that it is the Subscribers that determine when a Publisher emits its data. This is known as back pressure The reactive manifesto says of back pressure:

"When one component is struggling to keep-up, the system as a whole needs to respond in a sensible way. It is unacceptable for the component under stress to fail catastrophically or to drop messages in an uncontrolled fashion. Since it can’t cope and it can’t fail it should communicate the fact that it is under stress to upstream components and so get them to reduce the load. This back pressure is an important feedback mechanism that allows systems to gracefully respond to load rather than collapse under it."

Thus, back pressure is the feature within a reactive system that allows the Observers / Subscribers (the consumers of the data) to indicate whether they can process the emitted data fast enough or not. Flowables support this feedback mechanism from Observers (or Subscribers) however plain old Observables do not.

To create a flowable you can use the factory member function toFlowable(). This can be used to create a flowable from an observable. For example:

import io.reactivex.rxjava3.kotlin.subscribeBy
import io.reactivex.rxjava3.kotlin.toFlowable

fun main() {
    val flowable = listOf(0, 1, 2, 3, 5, 7)
        .toFlowable()
    flowable.subscribeBy { print("$it, ") }
    println()
}

This program generates:

0, 1, 2, 3, 5, 7,

We can also create a dynamic version of the above. The easiest way to do that is to create a plain old Observable and then convert it into a Flowable using the toFlowable() member function. When you do this, you need to specify the back pressure strategy to be adopted by the flowable. The options are all defined in the io.reactivex.rxjava3.core.BackpressureStrategy enum. These options are:

  • Backpressure.BUFFER
    This indicates if the flowable produces too many data items for the subscribers to handle then they are buffered in an unbound queue.
  • Backpressure.DROP
    This indicates that any data items that the subscriber can’t handle should be dropped.
  • Backpressure.ERROR
    This causes the reacvtive system to throws an error when the downstream subscribers can’t keep up.
  • Backpressure.LATEST
    This indciztes that the data stream should only keep the latest item emitted by onNext() overwriting any previous values.
  • Backpressure.MISSING
    This indicates that there are no safety measures in place and no guarantees; it is up to your application to handle any issues.

Using the functions created earlier in this blog we can create a Flowable from the prime numbers observable toFlowable(Backpressure.LATEST) as shown below:

fun main() {
    generatePrimeNumberObservable(11)
        .toFlowable(BackpressureStrategy.LATEST)
        .subscribe{print("$it, ")}
    println()
}

The output from this is again:

0, 1, 2, 3, 5, 7,


Summary

The RxKotlin extensions allow a more Kotlin like style in your applications. Using these libraries, it is possible to create Observables and Observers as well as to exploit back pressure using Flowables.


Would you like to know more?

If you found this article interesting and you're ready to take the next step, take a look at some of our courses which feature modules on Reactive Programming concepts:

Share this post on:

We would love to hear from you

Get in touch

or call us on 020 3137 3920

Get in touch