Playing Around with Buffers

Posted by & filed under , , , .

I found myself recently using the Buffer class in the Apache Commons Collections framework — and realised that the Buffer-related classes don’t get enough coverage as they probably should, since they can actually provide out-of-the-box solutions to common development problems. So I started this little exercise partly to demonstrate some of the powerful Buffer-based classes in Commons Collections and partly for my sheer amusement 🙂 So here it is — if you didn’t know too much about Buffer‘s previously, you might find out a useful thing or two; failing that, you would definitely be amused about how much one can complicate a simple problem 🙂

To exemplify some of the features of the Buffer classes, I thought of a simple example (read “useless” come to think of it, but that’s not the point!): converting a series of strings read from the console into integers. The idea is that we read a set of line from the console, assuming one number per line and we convert that into a set of integers — if conversion is possible for a line, or return a predefined value (let’s assume zero here) if the line doesn’t contain a valid integer. Simple right?

Straight away, your average developer would write something like this:

/**
 * Simple class with a single main method which reads line by line from the
 * console input and converts each line into an integer which is shown onto the
 * console right away. If the line read is not an integer, zero is printed.
 */
public class ConvertStringSimple {
 /** Value shown if the input string is not an integer. */
 public static final int    DEFAULT_VALUE = 0;
 
 /** Expected "command" to exit program. */
 public static final String EXIT_CMD      = "q";
 
 /**
  * Program entry point.
  *
  * @param arg
  *            Command-line arguments. Ignored.
  */
 public static void main(String arg[]) {
  try {
   BufferedReader rdr = new BufferedReader(new InputStreamReader(System.in));
   String line;
   while ((line = rdr.readLine()) != null) {
    // read a line at a time and convert it in-place
    // if exit command then simply exit
    if (EXIT_CMD.equalsIgnoreCase(line)) {
     System.exit(0);
    }
    try {
     System.out.println(Integer.parseInt(line));
    } catch (NumberFormatException e) {
     System.out.println(DEFAULT_VALUE);
    }
   }
  } catch (Exception e) {
   e.printStackTrace();
   System.exit(1); // boo!
  }
 }
}

Now that’s hardly OO or elegant, so you might want to add a bit more “class” to it you would think (hopefully :D) something upon the lines:

  • I will need a producer — which simply produces String instances based on console input
  • Also need a consumer — this consumer takes what gets produced in the step above and “does stuff” with it to create the integers
  • The “does stuff” bit, in an OOP manner, has to be offloaded to a transformer — which simply takes a String instance and applies whatever processing needed to produce integers
  • The producer and consumer obviously have to share some sort of buffer — the producer dumps stuff in there and the consumer picks it up from there and uses the transformer to ultimately produce integers; also obviously consumer and producer can run in their separate threads independent of each other and only “communicate” via this buffer.

So with those in mind, you could start writing a transformer like this:

/**
 * Simple interface for a "transformer". A transformer takes one type of data
 * and converts it into another type.
 */
public interface Transformer<T, V> {
 /**
  * The actual transform operation.
  *
  * @param input
  *            Data to be transformed
  * @return Transformed data.
  */
 V transform(T input);
}

This is a generic interface — and the implementation we need for this would be something like this:

/**
 * Simple transformation which tries to parse the input string as an integer,
 * and if it fails parsing it (wrong format?) it returns an integer with value
 * zero (0).
 */
public class StringToIntTransformer implements Transformer<String, Integer> {
 @Override
 public Integer transform(String input) {
  try {
   return Integer.valueOf(input);
  } catch (Exception e) {
   return 0;
  }
 }
}

That’s the transformer sorted. The producer, as we said would simply read a line from the console and produce a String — basically the line read — so something like this:

/**
 * Class used as a producer: reads strings from the console and puts them in a
 * "buffer". The buffer is a simple queue in this case, which is supplied at
 * creation time. Please note that this class is <b>not</b> synchronized, so if
 * this is needed, make sure the queue supplied in is synchronized.
 */
public class StringProducer {
 /** Expected "command" to exit program. */
 public static final String EXIT_CMD = "q";
 
 /** Queue used internally to dump the objects produced to. */
 private Queue<String>      queue;
 
 /**
  * Used to read strings from console. The strings are deposited in
  * {@link #queue the queue} and this is how they are produced.
  */
 private BufferedReader     reader;
 
 /**
  * The producing of strings is done in a separate thread -- this one.
  */
 private Thread             producerThread;
 
 /** Notifies these component when we're done. */
 private List<EndListener>  listeners;
 
 /**
  * Creates an object and initializes the queue we will be producing the
  * objects into.
  *
  * @param queue
  *            Queue which will store all the produced objects
  */
 public StringProducer(Queue<String> queue) {
  this.queue = queue;
  this.reader = new BufferedReader(new InputStreamReader(System.in));
  this.listeners = new ArrayList<EndListener>();
  this.producerThread = new Thread(new Runnable() {
   @Override
   public void run() {
    String line;
    try {
     while ((line = reader.readLine()) != null) {
      if (EXIT_CMD.equalsIgnoreCase(line)) {
       break;
      }
      StringProducer.this.queue.add(line);
     }
    } catch (IOException e) {
    System.err.println("Error:" + e.getMessage());
   } finally {
    notifyDone();
   }
  }
 });
}
 
 public void addEndListener(EndListener listener) {
  listeners.add(listener);
 }
 
 public void startProduce() {
  producerThread.start();
 }
 
 private void notifyDone() {
  for (EndListener l : listeners) {
   l.end();
  }
 }
}

To simply our implementation, we use a BlockingQueue as a “buffer” (in between producer and consumer) — this means first of all we don’t have to worry about synchronization (all BlockingQueue implementations take care of that) and also it means that we can use a method like take() which would block until data becomes available in the buffer (rather than have our own implementation of signalling/notify()/wait()/etc.)

Note also that we introduced a new component here — the EndListener interface: this is not entirely needed for this, as the producer could simply do a System.exit(0) when it encounters the “q” command (for quit) in the command line, however, rather than doing that, we want to decouple, in an OOP manner, the “what should happen when the producer finishes” from the rest of the world, so we introduced this simple listener interface:

/**
 * Used to signal in between components when we should end the producing or
 * consuming.
 */
public interface EndListener {
 /**
  * The only message sent by this notifier.
  */
 public void end();
}

The idea is the producer holds a list of listeners, and once it’s done reading from the console — whether the user types “q” or whether the thread gets interrupted, etc, it will notify each component interested in this notification so these components can cleanup after themselves etc.

Finally, the consumer, as we said, uses the transformer to apply it on each String it reads from the buffer:

/**
 * Simple consumer. Takes a string from a given "buffer" (queue) and transforms
 * it into an integer and then prints it on console.
 */
public class StringConsumer implements EndListener {
 /** Queue used to consume data from. */
 private BlockingQueue<String>        queue;
 
 /** The transform operation used to convert strings into integers. */
 private Transformer<String, Integer> transformer;
 
 /** Consumption of strings is done in this separate thread. */
 private Thread                       consumerThread;
 
 /**
  * Creates a consumer which starts right away consuming items from the
  * queue. The {@link #consumerThread consumer thread} runs until
  * {@link #end() the end notification} is received.
  *
  * @param queue
  *            Queue to consume from
  * @param transformer
  *            Implementation of an operation which converts strings to
  *            integers
  */
 public StringConsumer(BlockingQueue<String> queue, Transformer<String, Integer> transformer) {
  this.queue = queue;
  this.transformer = transformer;
  this.consumerThread = new Thread(new Runnable() {
   @Override
   public void run() {
    while (!StringConsumer.this.consumerThread.isInterrupted()) {
     try {
      String s = StringConsumer.this.queue.take();
      Integer i = StringConsumer.this.transformer.transform(s);
      System.out.println(i);
     } catch (InterruptedException e) {
      break;
     }
    }
   }
  });
  this.consumerThread.start();
 }
 
 @Override
 public void end() {
  consumerThread.interrupt();
 }
}

And at the end, all we have to do is to wire everything up :

/**
 * A more OOP-centric approach to converting strings to numbers.
 */
public class ConvertStringOop {
 /**
  * Program entry point.
  *
  * @param arg
  *            Command-line arguments. Ignored.
  */
 public static void main(String[] args) {
  BlockingQueue<String> queue = new SynchronousQueue<String>();
  Transformer<String, Integer> transformer = new StringToIntTransformer();
  StringProducer producer = new StringProducer(queue);
  StringConsumer consumer = new StringConsumer(queue, transformer);
  producer.addEndListener(consumer);
 
  final CountDownLatch latch = new CountDownLatch(1);
  EndListener listener = new EndListener() {
   @Override
   public void end() {
    latch.countDown();
   }
  };
  producer.addEndListener(listener);
 
  producer.startProduce();
  try {
   latch.await();
  } catch (InterruptedException e) {
   System.exit(1);
  }
 }
}

As a side note, we make use here again of the EndListener, so the main application threads gets notified when everything is finished and exists cleanly.

Right, now all of that look OOP-ish, so all good. Trouble is we have to provide some of the infrastructure ourselves here — mainly things like transformer and also, things like validation etc. While this is a very OOP approach (to have interfaces and provide implementations for them, transparent to the classes), the downside is that those interfaces are our interfaces — in other words, the frameworks we use don’t know about them at the time we write (or run) our code. In this particular case, it would be great if we can have a consumer totally agnostic of transformations that need to be applied to the data retrieved — the consumer should really pick up data from the queue and just print it out. Unfortunately in this instance we cannot do this, since the Queue class doesn’t allow for any hooks around it, so we have to build all the wrappers around it ourselves to make the consumer totally transparent of any transformation.

This is one of the things that the Buffer implementation in Apache Commons Collections helps with: just as with java.io package (and in fact many others), we can decorate a buffer to transform it into a different type of buffer, and we can chain these as much as needed to achieve everything we want — from the user’s point of view, when working at interface level, we work still with a Buffer, but under the cover that Buffer does all the validation, transformation and so on for us!

Ideally, our consumer should really look as simple as this:

/**
 * Simple consumer. Takes a string from a given "buffer" and transforms it into
 * an integer and then prints it on console. The transformation is done by
 * building a TransformedBuffer on top of the given one.
 */
public class StringConsumer {
 /** Buffer used to consume data from. */
 private Buffer buffer;
 
 /** Consumption of strings is done in this separate thread. */
 private Thread consumerThread;
 
 /**
  * Creates a consumer which starts right away consuming items from the
  * queue. The {@link #consumerThread consumer thread} runs until
  * {@link #end() the end notification} is received.
  *
  * @param queue
  *            Queue to consume from
  * @param transformer
  *            Implementation of an operation which converts strings to
  *            integers
  */
 public StringConsumer(Buffer buffer) {
  this.buffer = buffer;
  this.consumerThread = new Thread(new Runnable() {
   @Override
   public void run() {
    while (!StringConsumer.this.consumerThread.isInterrupted()) {
     try {
      Integer i = (Integer) StringConsumer.this.buffer.remove();
      System.out.println(i);
     } catch (Exception e) {
      break;
     }
    }
   }
  });
  this.consumerThread.start();
 }
}

As you can see our consumer simply takes integers directly off the buffer and prints them — it’s not its concern how the integers get there! (In the above example I’ve taken out everything to do with notifying components that we’re done etc — basically the whole EndListener functionality.)

Also, the producer, should simply “spit” out some String‘s in the buffer:

/**
 * Class used as a producer: reads strings from the console and puts them in a
 * "buffer". The buffer in this instance is an ASF Commons Collection
 * implementation of the Buffer interface.
 */
public class StringProducer {
 /** Expected "command" to exit program. */
 public static final String EXIT_CMD = "q";
 
 /** Buffer used internally to dump the objects produced to. */
 private Buffer             buffer;
 
 /**
  * Used to read strings from console. The strings are deposited in
  * {@link #queue the queue} and this is how they are produced.
  */
 private BufferedReader     reader;
 
 /** The producing of strings is done in a separate thread -- this one. */
 private Thread             producerThread;
 
 /**
  * Creates an object and initializes the queue we will be producing the
  * objects into.
  *
  * @param buffer
  *            Buffer which will store all the produced objects
  */
 public StringProducer(Buffer buffer) {
  this.buffer = buffer;
  this.reader = new BufferedReader(new InputStreamReader(System.in));
  this.listeners = new ArrayList<EndListener>();
  this.producerThread = new Thread(new Runnable() {
   @Override
   public void run() {
    String line;
    try {
     while ((line = reader.readLine()) != null) {
      if (EXIT_CMD.equalsIgnoreCase(line)) {
       break;
      }
      try {
       StringProducer.this.buffer.add(line);
      } catch (IllegalArgumentException e) {
       // not an integer
      }
     }
    } catch (IOException e) {
     System.err.println("Error:" + e.getMessage());
    } finally {
     notifyDone();
    }
   }
  });
 }
}

And then the caller would ensure the right thing happens by decorating as necessary the buffer:

/**
 * An approach to converting strings to numbers using the ASF Commons
 * Collections buffers.
 */
public class ConvertStringBuffers {
 /**
  * Program entry point.
  *
  * @param arg
  *            Command-line arguments. Ignored.
  */
 public static void main(String[] args) {
  /* Ensure we only accepts Integer. */
  Predicate predicate = new Predicate() {
   @Override
   public boolean evaluate(Object object) {
    if (object == null) {
     return false;
    }
    if (!(object instanceof Integer)) {
     return false;
    }
    return true;
   }
  };
  /* Used to convert string to int. */
  Transformer transformer = new Transformer() {
   @Override
   public Object transform(Object input) {
    try {
     if (!(input instanceof String)) {
      return 0;
     }
     String sInput = (String) input;
     return Integer.valueOf(sInput);
    } catch (Exception e) {
     return 0;
    }
   }
  };
  Buffer buffer = TransformedBuffer.decorate(
  PredicatedBuffer.decorate(BlockingBuffer.decorate(new UnboundedFifoBuffer()), predicate),
   transformer);
  StringProducer producer = new StringProducer(buffer);
  StringConsumer consumer = new StringConsumer(buffer);
  producer.addEndListener(consumer);
 
  final CountDownLatch latch = new CountDownLatch(1);
  EndListener listener = new EndListener() {
   @Override
   public void end() {
    latch.countDown();
   }
  };
  producer.addEndListener(listener);
 
  producer.startProduce();
  try {
   latch.await();
  } catch (InterruptedException e) {
   System.exit(1);
  }
 }
}

We still have as before the EndListener to worry about (read “implement”) but that would be identical to before, which is why it’s been left out in the sources above.

Now if you look at the above code, you will notice that both the producer and the consumer are very lightweight and disconnected now: the producer produces Strings and the consumer consumes Integers. The “bit in the middle” is simply done by decorating a simple buffer to make it synchronized first — this is so each add() to buffer needs to be followed by a remove() and also to ensure the 2 threads don’t corrupt the buffer state. Second decoration occurs to ensure data validation via a PredicatedBuffer: we need to make sure that we only have Integer objects added to the buffer, since this is what the consumer expects! The last (and most important) decoration is to apply a transformation on the fly to data coming it — same transformation as before which simply parses a String as Integer.

So now every time the producer attempts to produce a String, the following happens:

  1. The String is being parsed into an Integer
  2. The object produced at step 1 is checked to ensure it is an instance of Integer and it’s not null, and otherwise it’s thrown away — granted, not the best validation here, but this can grow into something proper (e.g. ensure we only have positive Integers?)
  3. The Integer is attempted to be added to the buffer — but this is done in a thread-safe manner, so we don’t have to worry about the integrity of the data in the buffer ourselves

And on the other end the consumer simply blocks until an Integer is available in the buffer, picks it up and displays it on the console!

I somehow find this implementation more intuitive — think also of all of this in the context of something like Spring for instance, where this mechanism works even better, since you are separating the transformation from the validation from the synchronization etc. Also, we no longer build the infrastructure (transformation, validation  etc) ourselves — we use a framework that already has these built in and allows us to hook into it. Imagine for instance that you can easily have some standard transformers (whether part of the Commons Collections or separate) which do these sort of parsing and validation and transformation — in which case you simply wire them in without writing the code yourself!

The other reason, with the risk of repeating myself, it makes the producer and consumer more lightweight — in the light of the above, you could end up with simply writing just a consumer and a producer and the main() method that just wires everything together (or, as I said before, use something like Spring to build these and put them together).

The only downside though with the Buffer’s in Commons Collections is that they don’t use (yet!) generics — would be nice to write something like:

Buffer<String> b = SynchronizedBuffer.decorate(new UnboundFifoBuffer<String>());
String data = b.remove();

rather than

String data = (String)b.remove();

which is the case now.

I’m sure generics are coming — in fact, having just written this myself, I’ll download the Collections code and see if I can help with that! However, even without the generics I believe this is a pretty cool mechanism to build separation in consumer/producer patterns — and quite likely others too.

Oh and one last thing: if all you want to do is to actually read a string, parse it as an int and dump it on the console, trust me, stick to the very first, non-OOP implementation — really, all this code above is way too much for that! (Yes, I know and agree with what Bjarne Stroustroup said about OOP 😀 )

As per usual, source code available for download here: convert.tar.bz2