Reactive Glimpse Part - II

Reactive Concurrency

Concurrency in Reactive universe has a little different flavour than what we experience in Imperative world.

A Reactive flow by default Single Threaded in nature. But we can achieve concurrency with the help of the operators like subscribeOn(), observeOn() etc. along with the Schedulers that Rx provides.

subscribeOn(): Asynchrously subscribes Subscribers/Observers to an ObservableSource on a Specified scheduler.
onserveOn(): Makes an ObservableSource to send all notifications/events on a specified scheduler asynchronously.
A few points that I would really like to state at the very beginning are:

1) The operators in a flow are always executed serially contextwise.
2) The Flow for a particular data item do not interfere with the flow for another data item belonging to the same ObservableSource.
Let's try to understand the concurrency from the context of the below test case.
    @Test
    public void testConcurrency(){

        rx.Observable.range(1,5)
                .map({val ->
            println "Maps: ${ val +1} at ${Thread.currentThread()}"
            val +1})
                .flatMap({integer -> rx.Observable.just(integer)
                .map({val->
            println "$val at ${Thread.currentThread()}"
            val*val})})
                .subscribe({output -> println "Output: $output at ${Thread.currentThread()}\n\n"})
    
    }
The above example adds 1 to every value of the range and then computes their square. This functionality can be accomplished directly with a single map() operator but instead we broke up the flow in several parts so that we can understand how subscribeOn() and onserveOn() affects the flow.

Now, if we run the above test case the output will be very straightforward and as expected i.e.

Maps: 2 at Thread[main,5,main]
2 at Thread[main,5,main]
Output: 4 at Thread[main,5,main]


Maps: 3 at Thread[main,5,main]
3 at Thread[main,5,main]
Output: 9 at Thread[main,5,main]


Maps: 4 at Thread[main,5,main]
4 at Thread[main,5,main]
Output: 16 at Thread[main,5,main]


........

Now let's introduce a subscribeOn() and investigate:

  rx.Observable.range(1,5)
                .subscribeOn(rx.schedulers.Schedulers.io())
    ......

We will see that the executing thread moves from the Main Thread to One from IO Scheduler when we use subscribeOn. i.e.

Maps: 2 at Thread[RxCachedThreadScheduler-1,5,main]
2 at Thread[RxCachedThreadScheduler-1,5,main]
Output: 4 at Thread[RxCachedThreadScheduler-1,5,main]


Maps: 3 at Thread[RxCachedThreadScheduler-1,5,main]
3 at Thread[RxCachedThreadScheduler-1,5,main]
Output: 9 at Thread[RxCachedThreadScheduler-1,5,main]


Maps: 4 at Thread[RxCachedThreadScheduler-1,5,main]
4 at Thread[RxCachedThreadScheduler-1,5,main]
Output: 16 at Thread[RxCachedThreadScheduler-1,5,main]


Now if we use multiple subscribeOn() along the Main flow then the processing will be assigned to the thread from the scheduler of the first encountered subscribeOn and all the subsequent subscribeOn() will be ignored. i.e.

  rx.Observable.range(1,5)
                .subscribeOn(rx.schedulers.Schedulers.io())
                .map({val ->
            println "Maps: ${ val +1} at ${Thread.currentThread()}"
            val +1})
                .flatMap({integer -> rx.Observable.just(integer)
               // .subscribeOn(rx.schedulers.Schedulers.computation())
                .map({val->
            println "$val at ${Thread.currentThread()}"
            val*val})
        })
                .subscribeOn(rx.schedulers.Schedulers.from(Executors.newFixedThreadPool(3)))
                .subscribe({output -> println "Output: $output at ${Thread.currentThread()}\n\n"})


In the above example there are two schedulers but the subscriber has subscribed on the first encountered subscribeOn() along the main flow.

Maps: 2 at Thread[RxCachedThreadScheduler-1,5,main]
2 at Thread[RxCachedThreadScheduler-1,5,main]
Output: 4 at Thread[RxCachedThreadScheduler-1,5,main]


Maps: 3 at Thread[RxCachedThreadScheduler-1,5,main]
3 at Thread[RxCachedThreadScheduler-1,5,main]
Output: 9 at Thread[RxCachedThreadScheduler-1,5,main]


Maps: 4 at Thread[RxCachedThreadScheduler-1,5,main]
4 at Thread[RxCachedThreadScheduler-1,5,main]
Output: 16 at Thread[RxCachedThreadScheduler-1,5,main]


.......

If we just swap these two schedulers then the output will be:

Maps: 2 at Thread[pool-1-thread-1,5,main]
2 at Thread[pool-1-thread-1,5,main]
Output: 4 at Thread[pool-1-thread-1,5,main]


Maps: 3 at Thread[pool-1-thread-1,5,main]
3 at Thread[pool-1-thread-1,5,main]
Output: 9 at Thread[pool-1-thread-1,5,main]


So, now instead of io scheduler, the fixed thread pool scheduler is in effect.

But if we just uncomment the subscribeOn in the above example then the operation within the flatmap will be assigned to the thread from the computation scheduler, which is very much not in line to what we have just discussed.

The output will look like:

Maps: 2 at Thread[pool-1-thread-1,5,main]
Maps: 3 at Thread[pool-1-thread-1,5,main]
2 at Thread[RxComputationThreadPool-1,5,main]
Maps: 4 at Thread[pool-1-thread-1,5,main]
Maps: 5 at Thread[pool-1-thread-1,5,main]
Maps: 6 at Thread[pool-1-thread-1,5,main]
3 at Thread[RxComputationThreadPool-2,5,main]
Output: 4 at Thread[RxComputationThreadPool-1,5,main]


4 at Thread[RxComputationThreadPool-1,5,main]
Output: 16 at Thread[RxComputationThreadPool-1,5,main]


6 at Thread[RxComputationThreadPool-1,5,main]
Output: 36 at Thread[RxComputationThreadPool-1,5,main]


Output: 9 at Thread[RxComputationThreadPool-2,5,main]


5 at Thread[RxComputationThreadPool-2,5,main]
Output: 25 at Thread[RxComputationThreadPool-2,5,main]

So here we can see Threads from two different Scheduler are in action.
The reason being: flatMap() creates an ObservableSource out of every elements passed as argument and then merges them. While merging it internally subscribes to an Observer known as InnerObserver. So during this process, it subscribes to the Computation Scheduler. We can just refer it as a flow inside a flow.

Now, things will be interesting when we will use observeOn.
Upon using a Single observeOn at the beginning the behaviour will be same when we will use a Single subscribeOn

Now, when we use more than one observeOn along the main flow, then the behaviour will be much different from using more than subscribeOn.
Let's see.

    rx.Observable.range(1,5)
        .observeOn(rx.schedulers.Schedulers.from(Executors.newFixedThreadPool(3)))
                .map({val ->
            println "Maps: ${ val +1} at ${Thread.currentThread()}"
            val +1})
                .flatMap({integer -> rx.Observable.just(integer)
              //  .subscribeOn(rx.schedulers.Schedulers.computation())
        //.observeOn(rx.schedulers.Schedulers.computation())
        // .observeOn(rx.schedulers.Schedulers.io())
                .map({val->
            println "$val at ${Thread.currentThread()}"
            val*val})
        })
         .observeOn(rx.schedulers.Schedulers.computation())
                .subscribe({output -> println "Output: $output at ${Thread.currentThread()}\n\n"})



The output will be:

Maps: 2 at Thread[pool-1-thread-1,5,main]
2 at Thread[pool-1-thread-1,5,main]
Maps: 3 at Thread[pool-1-thread-1,5,main]
3 at Thread[pool-1-thread-1,5,main]
Maps: 4 at Thread[pool-1-thread-1,5,main]
4 at Thread[pool-1-thread-1,5,main]
Maps: 5 at Thread[pool-1-thread-1,5,main]
5 at Thread[pool-1-thread-1,5,main]
Maps: 6 at Thread[pool-1-thread-1,5,main]
6 at Thread[pool-1-thread-1,5,main]
Output: 4 at Thread[RxComputationThreadPool-2,5,main]


Output: 9 at Thread[RxComputationThreadPool-2,5,main]


Output: 16 at Thread[RxComputationThreadPool-2,5,main]


Output: 25 at Thread[RxComputationThreadPool-2,5,main]


Output: 36 at Thread[RxComputationThreadPool-2,5,main]

observeOn always changes the flow of control. Due to this, everything got executed by the Thread from the Fixed Thread Pool, until we have we have used observeOn. As soon as we did that, observeOn took the control of the flow.

Now if we uncomment the observeOn which used IO scheduler from within the flatMap, that observeOn will also be in action and the output will be something like:

Maps: 2 at Thread[pool-1-thread-1,5,main]
Maps: 3 at Thread[pool-1-thread-1,5,main]
Maps: 4 at Thread[pool-1-thread-1,5,main]
Maps: 5 at Thread[pool-1-thread-1,5,main]
3 at Thread[RxCachedThreadScheduler-2,5,main]
Maps: 6 at Thread[pool-1-thread-1,5,main]
5 at Thread[RxCachedThreadScheduler-4,5,main]
Output: 9 at Thread[RxComputationThreadPool-2,5,main]


Output: 25 at Thread[RxComputationThreadPool-2,5,main]


2 at Thread[RxCachedThreadScheduler-1,5,main]
Output: 4 at Thread[RxComputationThreadPool-2,5,main]


4 at Thread[RxCachedThreadScheduler-3,5,main]
Output: 16 at Thread[RxComputationThreadPool-2,5,main]


6 at Thread[RxCachedThreadScheduler-5,5,main]
Output: 36 at Thread[RxComputationThreadPool-2,5,main]

So all the three schedulers are in action for observeOn.

Now if we replace the internal observeOn with subscribeOn, then too we will see the all schedulers in action due to the the behavious of the flatMap() as stated above.

 rx.Observable.range(1,5)
           
        .observeOn(rx.schedulers.Schedulers.from(Executors.newFixedThreadPool(3)))
                .map({val ->
            println "Maps: ${ val +1} at ${Thread.currentThread()}"
            val +1})
                .flatMap({integer -> rx.Observable.just(integer)
                .subscribeOn(rx.schedulers.Schedulers.io())
                .map({val->
            println "$val at ${Thread.currentThread()}"
            val*val})
        })
         .observeOn(rx.schedulers.Schedulers.computation())
                .subscribe({output -> println "Output: $output at ${Thread.currentThread()}\n\n"})


The output will be something like:

Maps: 2 at Thread[pool-1-thread-1,5,main]
Maps: 3 at Thread[pool-1-thread-1,5,main]
Maps: 4 at Thread[pool-1-thread-1,5,main]
Maps: 5 at Thread[pool-1-thread-1,5,main]
2 at Thread[RxCachedThreadScheduler-1,5,main]
Maps: 6 at Thread[pool-1-thread-1,5,main]
Output: 4 at Thread[RxComputationThreadPool-2,5,main]


4 at Thread[RxCachedThreadScheduler-3,5,main]
Output: 16 at Thread[RxComputationThreadPool-2,5,main]


6 at Thread[RxCachedThreadScheduler-5,5,main]
5 at Thread[RxCachedThreadScheduler-4,5,main]
3 at Thread[RxCachedThreadScheduler-2,5,main]
Output: 36 at Thread[RxComputationThreadPool-2,5,main]


Output: 25 at Thread[RxComputationThreadPool-2,5,main]


Output: 9 at Thread[RxComputationThreadPool-2,5,main]

Along the main flow if multiple observeOn are present then all of them, will be in action, which is unlike subscribeOn. This is in very short about how multithreading can be implemented in Rx.

Comments

Popular posts from this blog

Use of @Configurable annotation.

Spring WS - Part 5

Spring WS - Part 4