Parallel RxJava and Spock Oddity

Posted by & filed under , .

network pc monitorsI use RxJava a lot nowadays — back in the day before I joined Netflix I was struggling with it a bit and mostly watched from the sideline but nowadays I find myself pulling in that dependency in a lot of the Java code I’m writing. And I also use the Spock framework a lot for testing nowadays. I know there are a lot of Mockito die-hards out there but for me Spock feels the closest to writing a test spec and also because of Groovy I end up writing much (much!) less code.

And it was in one of this projects I came across an oddity with this combination and thought to share it.

Consider this scenario: you have a collection of inputs arriving in your code and you need to do some processing. Since the processing of each input is independent of anything else I’m thinking I could do it in parallel — and this is where RxJava comes in handy: if I just flatMap each input and inside the flatMap I do a subscribeOn (let’s say io scheduler in this case, maybe I’m making a call over the wire, though I can assure you that computation and most of the other schedulers will have the same behaviour).

If we assume that our input data is a String, you could end up something like this:

public String parallelize(List<String> lst) {
 return Observable.from(lst)
                  .flatMap(s ->
                                    Observable.just(s)
                                              .map(this::lower)
                                              .map(str -> "STRING " + str)
                                              .subscribeOn(Schedulers.computation())
                  )
                  .reduce("", (prev, another) -> prev + another)
                  .toBlocking()
                  .single();
}

 

As you can see for the purpose of this exercise I have just assumed that the “processing” means lowercasing a string via a handcrafted method (notice I am NOT using String.toLowerCase) and then some string concatenation. And at the end, after all the inputs have been processing I’m just concatenating them together.

The only tricky bit here is that I’m doing the subscribeOn inside the flatMap — this triggers parallel processing of these inputs; without it the order as well as parallelism level would be not guaranteed and more often than not these would NOT be processed in parallel.

Now you can test the code for yourself and you will see it runs absolutely fine if you run the main() method (make sure you pass some string parameters in the command line). So the code is actually ok.

The problem however starts exhibiting itself when I write a unit test for this using Spock (and CGLIB). If I write a test like this:

def "parallel"() {
  List<String> lst = ["ABC", "XYz", "one"]
  def x = new ParallelRx()
 
  when:
    def r = x.parallelize(lst)
 
  then:
    r == "STRING xyzSTRING abcSTRING one"
}

Then actually everything works fine. The problem starts when I change to using a Spy:

def "parallel"() {
  List<String> lst = ["ABC", "XYz", "one"]
  def x = Spy(ParallelRx)
 
  when:
    def r = x.parallelize(lst)
 
  then:
    r == "STRING xyzSTRING abcSTRING one"
}

You will notice that your test never finish running! Even if you add mocks to the methods using the Spy it doesn’t make a difference:

def "parallel"() {
  List<String> lst = ["ABC", "XYz", "one"]
  def x = Spy(ParallelRx)
 
  when:
    def r = x.parallelize(lst)
 
  then:
    r == "STRING xyzSTRING abcSTRING one"
    1 * x.lower("ABC") >> "abc"
    1 * x.lower("XYz") >> "xyz"
    1 * x.lower("one") >> "one"
}

This would render the same result. As I said before also, you can try other schedulers and they all fail with the exception of trampoline! And this was my first hint that something somewhere gets screwed up. Trampoline basically executes everything in the same thread. I tried debugging this and I got as far as noticing that the BlockingObservable in RxJava has a countdown latch which it waits on — it seems if using a single thread model the operations which locks this latch, unlocks it and waits for it happen in the right order but once we start using a different scheduler with multiple threads something gets messed up. I suspect it’s the proxy-ing that Spock does via CGLIB but I have found it difficult to confirm that.

Bottom line: using a parallel Rx approach like the one above can only be tested without using Spy instances 🙁 (There are of course ways around this by pulling out the code you are trying to spy on in a different class etc but if you take an approach like the one above you will find out that your tests run for ever!)

The code for this post is on Github: https://github.com/liviutudor/spock-spy-rxjava