Reactive Glimpse-Part 1

A peek into Observables.


Reactive Approach/Principles has unfurled new dimensions of programming to us. Reactive Manifesto being the key, paved the way to approach things with a different eye and with its multilingual dominion we have Rx's omnipresence.

For me it all started with RxJava and RxGroovy and now with ProjectReactor providing Specification to build Reactive Systems, Spring too incorporating Reactive principles in its 5th edition and together with Java 9 flow, it is evident that Reactive is gaining both strength and momentum.

Even different tools/api now extend their reactive support, be it JDBC, RabbitMQ, Kafka etc.

My interaction with the Reactive Dimension has been limited to Observables, Observers/Subscribers accompanied by several other operators. Thats fairly little I can understand and the justification being I am JUST A BEGINNER HERE.

From what I understood till now is Observables are the Data Sources and it is through Observers/Subscribers we fetch the data out of them. These Observers/Subscribers are much like several terminal operations we perform with Java 8 Streams.

What makes me think is how the data flows from Observables to Observers/Subscribers once we subscribe.
Here we are going to take quick look at it.

Lets take an Example first:

 Observable.fromIterable(Arrays.asList("11111", "222", "33"))
and now when we subscribe to this Observable using some Observers/Subscribers we start getting values in the onNext() method.
But how does it happen?

Under the hood there is a component known as RxJavaPlugins, which plays a vital role here.
This Component provides hooks to the various Lifecycle events of Observables, with the help of various functions like: onObservableAssembly, onFlowableAssembly etc.

RxJavaPlugins provides a lot of ultility functionalities. We will discuss them in our upcoming posts.

So when we try to create an Observable (here For example from an Iterable), we end up getting a specific Observable Type which here is: ObservableFromIterable. Similarly, when we use just(T item) we get ObservableJust and hence these follows whenever we create or apply operations on the Observable.

So for every creator function of Observable we end up in getting a Specific type and this pattern even follows when we apply any operator on any ObservableSource.

For Example when we apply subscribeOn() operator to any ObservableSource we get ObservableSubscribeOn.
This pattern follows for others operator too. So there lies a generic uniformity across the API.

RxJavaPlugins then acts on these specific Observable types and returns types specific to the corresponding Lifecycle hook function if present.

Since the method used by RxJavaPlugins here is onAssembly() so the Lifecycle hook function used here is onObservableAssembly.

I think the following Sequence diagram will help us to understand the situation better.

Now when we subscribe for that Specific Observable type, then the subscribeActual() method gets called. The Observers/Subscribers that we use while subscribing gets passed to this method.

This method does the real magic.

Within this method we create an entity of type Disposable(here the disposable type would be FromIterableDisposable) and we pass this disposable to the onSubscribe() method of the Subscriber/Observer( passed as a parmeter) to the subscribeActual() method and then finally we call some method specific to that disposable.

This method (belonging to the Disposable type as created in the above step) iterates through the source and passes one value at a time to the onNext() method of the Subscriber/Observer( passed as a parmeter), if error arises then onError() method is invoked otherwise onComplete() method is called finally.

The Sequence Diagram for this flow will be:

This is the basic flow which describes how data flows from the ObservableSource to its corresponding Subscriber.

Comments

Popular posts from this blog

Use of @Configurable annotation.

Spring WS - Part 5

Spring WS - Part 4