Java 8 Accumulators and Adders

Posted by & filed under , , , .

iStock_000016987294XSmall servers bitsIf you’ve been using Java for a while now, then like me, you must have saluted the move quite a while back in JDK 1.5 to introduce the atomic classes (AtomicLong, AtomicBoolean and so on). They were a big step forward, away from the clunkyness of having to create ridiculous bottlenecks in the code for the simple purpose of keeping a running total or some other simple accumulation operation.

They not only allowed you to have an out-of-the-box thread-safe class to use for keeping things like running totals but also they provided atomic operations such as “get and set” , “add and get”, “get and increment” and so on. And also, on top of that, the implementation provided was (is!) lock-free — one important aspect which any Java developer worth his money will be pleased of.

The problem with the atomic classes though was that it wasn’t really eliminating the bottleneck — it was just making the bottleneck smaller.

Consider the classisc example of counting all successful hits to a certain page / servlet / component/ service / whatever on a web server. The idea behind this is very simple:

  1. Request comes in
  2. If response code is not HTTP 200 (OK) then ignore it and don’t count this request
  3. If however the request is processed successfully and the response code is HTTP 200 (OK) then increment a running counter which at any point tells us how many successful requests we have served.

Pretty simple, right? Right. The code for this as such would be very simple:

...
AtomicLong counter = new AtomicLong(); // set to zero at start
...
// request comes in and is processed
...
if( isRequestSuccessful(request) ) {
   counter.incrementAndGet(); // count the request
}

There is (at least) one problem with the above code though. The problem is that the call to incrementAndGet does the actual addition/increment! In other words, when that call is made, the system spends some time incrementing the current value by one, storing the result (and returning it too) while also making sure that any other attempts to change the value are blocked until this atomic operation finishes. The good thing about this is that by the time the call finishes, the AtomicLong instance contains the up-to-date number of successful calls to our service. The bad news about it though is that it blocks any other attempts trying to add to the value! If we need the up-to-date number at the end of each call then there is no way around it; however, having worked on apps which required such running counters, I found out that we don’t need that number to be all the time kept up to date: instead we need the number to be up-to-date only when we try to use that value!

In other words, it is a case of eventual consistency is preferred — I’d much rather have the call to increment() return right away and not lock up my system, and at some point when i specifically request the value then have the value computed and return the precise up-to-date running total.

If eventual consistency is preferred, so we don’t have preferably any locking when we call increment(), then the above implementation is not ideal — far from it. Instead, we would have to implement our own buffering method which stores all these addition requests in some sort of queue / buffer and every now and then actually performs the additions — be it regularly (on a timer?) or on demand (when we explicitly call get()). Also what if the operation we need for our running counter is not just simply addition or incrementing? What if we want to implement all that “if request successful” logic separately? So we just create a “counter” instance where we supply a function which does the isRequestSuccessful() checking then from our code we simply just call increment() — then under the covers increment() invokes the function, if the result is true then proceeds with the increment. This way the code looks much cleaner:

 

class MyCounter {
    private Function f;
    public MyCounter( Function f ) {
        this.f = f;
     }
 
     public void increment() {
         if( f.ok() ) {
             // proceed to increment the counter
          }
     }
}
//...
// our code will then do something like this:
Function f = new Function() {
    public boolean ok() {
         if( requestIsSuccessful ) return true;
         return false;
    }
MyCounter counter = new MyCounter(f);
//...
// then we simply call increment:
counter.increment();

If you look at the above, the final bit (the client requesting the incrementing to take place) looks much cleaner as it’s a simple call to increment() — everything else, checking of the request etc happens behind the curtains.

Now this is a step forward in code clarity, however, it does nothing for bottlenecks, because it is still the case that the call to our class increment() method filters through finally to a call to the AtomicLong.increment() which does block other threads to perform atomic operation. So, we have increased code clarity but haven’t eliminated the bottleneck.

At this point I have to go back a few years ago to the date when I hit this obstacle myself — and it is at that point that I have decided to contribute to the Apache Commons Functor project. I have submitted at the time the whole Aggregator package and after a few iterations (thanks again to Simone Tripodi for his great help in this!) this went into the main stream of code. The idea behind aggregators was rather simple: consider that we have a continuous source of discrete data, we want to aggregate this data in some way and have the result of the aggregation computed on a timer basis rather than necessarily on the fly. (Though implementations for both was submitted.)

Let’s apply this to the simple case of our counter of successful requests to see how this would work:

  1. We implement a list-backed aggregator — this adds elements to a list rather than perform any operations
  2. We supply a function to the aggregator which looks a bit like our function above: simply checks if the request was processed successful and if so counts the request
  3. We then supply a time listener — this gets notified periodically when we are about to flush the list. At this point we get a reference to the list and can decide to call our function to aggregate the data.
  4. Our code still simply calls aggregator.add(request) — rather than increment() — and everything else happens behind curtains.

The above approach reduces the need for blocking for a longer period (needed to perform the aggregation / increment /etc) and also allows us to encapsulate the checking and processing of data away from the client.

There is still a problem with the above: it does block on add()! The blocking occurs in the call to list.add() — since we use a list-backed storage, adding elements to the list has to block so the data store behind it doesn’t get corrupted. Sure, if we don’t care for the order in which the elements come in, we can use some other collection implementation which offers a higher throughput in a threaded environment.

Despite all of the above, the aggregator implementation in Apache Commons Functor though was I believe a step in the right direction — and that’s not only because I wrote it! 🙂 But rather because it took away the aggregation operation from the point when the add() was called and applied it later on. This allowed for complex (and lengthy!) operations to be performed without worrying too much about the complexity, safe in the knowledge that we are not blocking incoming data when we process it.

As such, when Java 8 came out and I started looking at the additions to the language and the platform, the adders and accumulators captured my attention right away since it sounded very similar to what I implemented for the Apache Functor project! And guess what? It turns out it is the same idea — slightly simplified and with a much faster implementation! (Thanks, Oracle for acknowledging the fact that we needed that in the JDK! 😉 )

LongAccumulator

Let’s look at the JavaDoc for this class which reads:

One or more variables that together maintain a running long value updated using a supplied function. When updates (method accumulate(long)) are contended across threads, the set of variables may grow dynamically to reduce contention. Method get() (or, equivalently, longValue()) returns the current value across the variables maintaining updates.

This class is usually preferable to AtomicLong when multiple threads update a common value that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.

The order of accumulation within or across threads is not guaranteed and cannot be depended upon, so this class is only applicable to functions for which the order of accumulation does not matter. The supplied accumulator function should be side-effect-free, since it may be re-applied when attempted updates fail due to contention among threads. The function is applied with the current value as its first argument, and the given update as the second argument. For example, to maintain a running maximum value, you could supply Long::max along with Long.MIN_VALUE as the identity.

Let me translate this for you:

  • This class relies on more than one variable to store the data — in fact, as more requests come in, it suggests that it will check if all the existing variables are currently being updated/changed, the code will create a new variable to reduce blocking.
  • It alllows for a function to be supplied to perform the “accumulation”; this is the same as the concept of “aggregation” in Apache Functor — and it’s simply a function which allows an operation to be performed on 2 values and returns a result. The nicety here is that since Java 8 introduced lambda expressions, we don’t have to formalize interfaces and superclasses here, but simply specify a lambda expression with 2 parameters.
  • The accumulator function is applied eventually — not at the point of call.
  • Performs much better than AtomicLong especially under high contention.

Let’s look back at our example of updating the number of successful calls to our web service. All we have to do now is create an accumulator with a lambda expression which simply checks isRequestSuccessful() and if so adds the 2 operands; then our client call simply does a call to accumulate():

// ...
LongAccumulator acc = new LongAccumulator((a, b) -> if( isRequestSuccessful(request) ) {a+b}, 0L);
// ...
acc.accumulate(1L);

What happens in the background is:

  1. Call for accumulate(1) comes in
  2. The framework tries to update one of the variables created to store the data — initially just one.
  3. If that variable is currently being updated (and currently blocked) then try the next unblocked one — if none available then a new variable is created and the value of 1 is stored in it.
  4. The framework will attempt to apply the lambda expression accumulator on each call but if other threads are blocking this operation it will give up and not block to wait for them.
  5. Steps 3+4 executes for each thread calling accumulate(1)
  6. When we finally call get() (which is equivalent to sum() according to JavaDoc) then the framework goes through all these variables — in an order which cannot be guaranteed! — and applies the lambda expression finally computing the value.

As you can guess this increases the performance of the increment operation hugely! I will follow up probably on this soon with some time comparison, as I need to find a way to generate huge number of threads to simulate thread contention.

LongAdder

This is another interesting class in Java 8 and it seems to be a specialized version of the accumulator. Whereas accumulator offers a method accumulate() (which is generic and performs whatever operation was specified in the constructor), the adder only offers an add() method (which as the name suggests simply adds 2 values).

In fact, looking at the JavaDoc for accumulator, this is stated clearly:

Class LongAdder provides analogs of the functionality of this class for the common special case of maintaining counts and sums. The call new LongAdder() is equivalent to new LongAccumulator((x, y) -> x + y, 0L.

So if we look back at our example, then this can be easily implemented as follows:

       LongAdder counter = new LongAdder();
       if( isRequestSuccessfull(request)) {
           counter.add(1);
       }

As you can see, we’re back to our client performing the checking prior to the call to add(), same as it was the case with AtomicLong. That’s because the adder doesn’t allow the abstraction of the operation — it is a very specific one: addition. However, in this case, the thread contention is hugely decreased compared to the AtomicLong class (for reasons explained above, in the accumulator class).

Conclusion

You can still use AtomicLong in your apps — if you need your get() operation to be fast, based on the fact that the data has been already calculated at the point of call; however, if you are looking for eventual consistency, then the adder is a much better approach. And if you also want a more complicated processing to be applied to your data, then accumulator is the way. Unfortunately, that means in the long run that my aggregator is going to be deprecated, however, I take consolation in the fact that my aggregator pattern preempted Oracle to come up with their accumulator 🙂

 

One Response to “Java 8 Accumulators and Adders”

  1. Liv