Netting Reactive Systems: RxNET

In the fourth of our Reactive mini-series, Jack Card explores the facilities provided in Rx.NET for use with the C# language.


23-03-2021
Bcorp Logo
Netting Reactive Systems: RxNET

Jack Card explores the facilities provided in Rx.NET to use with C#...

In the last three blogs, we introduced the core concepts behind Reactive Systems, explored both Reactive Java with RxJava and Reactive Kotlin (RxKotlin). 

From this it might appear that the Reactive X project has focussed primarily on JVM languages, however this is not true. In fact the Reactive X project provides implementations of its model for Python (RxPy), JavaScript (RxJS), C++ (RxCpp) as well as dot Net languages such as C# (Rx.NET).

In this blog we will explore the facilities provided in Rx.NET for use with the C# language.

Rx.NET

The latest release of Rx.NET is version 5 which can be obtained using NuGet -  this is the version required by .NET 5.

As with other implementations of the Reactive X model, the core concepts in Rx.NET are Observables and Observers. An Observer is a consumer of a set of values which may be published or emitted by tan Observable.

Between the Observable and the Observer is a data stream. The data Stream is used to hold the data emitted by the Observable while it is being transported to the Observers. This model is illustrated in the following diagram:

Netting Reactive Systems: RxNET

Observables and Observers

In the Rx.NET library the Observable is represented by the IObservable<T> interface. This interface defines a single method used to subscribe an observer to an Observable. The interface is defined as:

public interface IObservable<T>
{
    IDisposable Subscribe(IObserver<T> observer);
}

In turn there is an interface IObserver<T> which is used to define an Observer which can receive data emitted by an Observable of type T. This interface defines three methods, OnNext(T value), OnError(Exception error) and OnCompleted(), as shown below:

public interface IObserver<T>
{
    void OnNext(T value);
    void OnError(Exception error);
    void OnCompleted();
}


The three methods defined in the IObserver<T> interface are explained below:

  • void OnNext(T value)
    Provides the Observer with a new item to observe.
  • void OnError(Exception error)
    Notifies the Observer that the Observable has generated an exception.
  • void OnComplete()
    Notifies the Observer that the Observable has finished sending data.


Creating Observables

An Observable can be created using the Observable.Create() factory method from the System.Reactive.Linq namespace. This can be used to create a simple lambda function that can generate a series of data items that can be emitted by the Observable. Note that the Observable.Create() method returns a Disposable instance which in the following example is represented by the default empty Disposable (as there is nothing left at this point to dispose). This is defined in the System.Reactive namespace.

The sample program now looks like:

using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace ReativeConsoleApp
{
	class Program
	{
		public static IObservable<int> GetPrimeNumbersObservable()
		{
			return Observable.Create<int>(observer =>
			{
				observer.OnNext(0);
				observer.OnNext(1);
				observer.OnNext(2);
				observer.OnNext(3);
				observer.OnNext(5);
				observer.OnNext(7);
				observer.OnCompleted();
				return Disposable.Empty;
			});
		}

		static void Main(string[] args)
		{
		}
	}
}

    The method GetPrimeNumbersObservable() will return an IObservable which will generate a series of prime numbers from 0 to 7. This means that the type of observable being created is actually IObservable<int> as the observable emits integers.

    Alternatively, we could create an observable from static content using a list. This can be done using the ToObservable() method:

    var list = (0, 1, 2, 3, 5, 7);
    var observable = list.ToObservable(Scheduler.ThreadPool);

    Creating Observers

    We can now create a class that implements the IObserver<T> interface. As the Observable emits integers, we must actually implement IObserver<int>. This ensures that the Observer can handle the same type of data as is emitted by the Observable. This means that we will need to implement the OnCompleted(), onError(Exception) and OnNext(int) methods. For example:

    public class SampleObserver: IObserver<int> 
    {
            public void OnCompleted()
            {
                Console.WriteLine("All data sent");
            }
     
            public void OnError(Exception error)
            {
                Console.WriteLine(
                      $"Observable generated: {error.Message}");
            }
     
            public void OnNext(int value)
            {
                Console.WriteLine($"Observer received: {value}");
            }
    }

    The class implemented above merely logs output to the console. Thus it will log if it has received an integer, it will log that Observable has completed sending data or if there is an error generated.

    Connecting Everything up

    New that we have an observable that can emit data and an observer that can receive data we can connect them together. For example:

    public class Program
    {
        private static void Main(string[] args)
        {
            var observableInstance = 
                          GetPrimeNumbersObservable();
            var observerInstance = new SampleObserver();
            observableInstance.Subscribe(observerInstance);
            Console.WriteLine("\nPress ENTER to exit … \n");
            Console.ReadLine();
        }
    }

    When this program is executed the output generated is:

    Observer received: 0
    Observer received: 1
    Observer received: 2
    Observer received: 3
    Observer received: 5
    Observer received: 7
    All data sent
    Press ENTER to exit ...
    
    Netting Reactive Systems: RxNET


    Applying Operators

    Once you have set up a reactive application it is possible to apply operators to the data being emitted by an observable. That is, it is possible to apply one or more operators to the Observables data stream. These operators can be used filter, transform and generally refine how and when the data is supplied to the Observer.

    Rx.NET provides a range of operators that can be classified as being:

    • Creational operators - used to create Observables such as ToObservable(),
    • Transformational operators - used to convert the data in some way,
    • Combinatorial operators - that combine data streams together,
    • Filter operators - that filter out values using Where().

    As an example, the following program filters out all even prime numbers from the data emitted by the observable:

    observable:
    public class Program
    {
        private static void Main(string[] args)
        {
            var observableInstance = 
                     GetPrimeNumbersObservable();
            var observerInstance = new SampleObserver();
            observableInstance
                    .Where(e => e % 2 != 0)
                    .Subscribe(observerInstance);
            Console.WriteLine("\nPress ENTER to exit … \n");
            Console.ReadLine();
        }
    }

    The output from this program is:

    1, 3, 5, 7


    Rx.NET v other Rx Libraries

    You may have noted that the names used in Rx.NET differ from those used in other ReactiveX implementations. This is because the implementors of the Rx.NET library decided to align the operators with the names used elsewhere within the .NET world, particularly within LINQ rather than to adopt the usual RX conventions. This is helpful if you are coming to RX.NET from other parts of the .NET world (which is probably the most likely scenario) however it can be confusing if you are coming to Rx.NET from say RxJava. The following table lists some of the different names

    Netting Reactive Systems: RxNET


    Complete Program

    The complete program is given below for reference:

    using System;
    using System.Reactive.Disposables;
    using System.Reactive.Linq;
    
    namespace ReativeConsoleApp
    {
    	public class SampleObserver : IObserver<int>
    	{
    		public void OnCompleted()
    		{
    			Console.WriteLine("All data sent");
    		}
    
    		public void OnError(Exception error)
    		{
    			Console.WriteLine(
    				  $"Observable generated: {error.Message}");
    		}
    
    		public void OnNext(int value)
    		{
    			Console.WriteLine($"Observer received: {value}");
    		}
    	}
    
    	class Program
    	{
    		public static IObservable<int> GetPrimeNumbersObservable()
    		{
    			return Observable.Create<int>(observer =>
    			{
    				observer.OnNext(0);
    				observer.OnNext(1);
    				observer.OnNext(2);
    				observer.OnNext(3);
    				observer.OnNext(5);
    				observer.OnNext(7);
    				observer.OnCompleted();
    				return Disposable.Empty;
    			});
    		}
    
    		static void Main(string[] args)
    		{
    			var observableInstance =
    					  GetPrimeNumbersObservable();
    			var observerInstance = new SampleObserver();
    			observableInstance.Subscribe(observerInstance);
    			Console.WriteLine("\nPress ENTER to exit … \n");
    			Console.ReadLine();
    
    		}
    	}
    }

    Would you like to know more?

    If you found this article interesting and you're ready to take the next step, take a look at some of our courses which feature modules on Reactive Programming concepts:

    Share this post on:

    We would love to hear from you

    Get in touch

    or call us on 020 3137 3920

    Get in touch