Tuesday, January 22, 2013

Synchronized is for swimming only

I am am rather embarrassed to admit that I was recently bitten by the synchonized keyword in Java... again...  I have this motto that if you think you understand multi-threading, you don't.  It can be by far one of the most complex and tricky things to do right... and is by far the most difficult to debug (in my humblest of opinions of course). It is also one of those kind of problems that can be hardware-specific (typically a faster computer, makes it break quicker). When a method is defined as synchronised, i.e.

public static synchronized void someMethod() {
...
}
view raw SomeMethod.java hosted with ❤ by GitHub
...then it means that the contents of that method is defined as a critical section, which implies that only one thread can be inside that method at any one point in time.  The synchronize modifier requires an object to lock on of course.  For static methods (i.e. methods associated with the class), the lock is on the class object, i.e. Someclass.class, while objects lock on the instance of that class (i.e. the object).  This of course has a pesky side-effect, that one must be aware of.

Consider the following code:

public class StaticMethodSynchronisation {
public static synchronized void foo() throws InterruptedException {
System.out.print("I am in foo: ");
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
Thread.sleep(100);
}
System.out.println("done with foo...");
}
public static synchronized void bar() throws InterruptedException {
System.out.print("I am in bar: ");
for (int i = 0; i < 10; i++) {
System.out.print((char) (65 + i) + " ");
Thread.sleep(100);
}
System.out.println("done with bar...");
}
public static void main(String[] args) {
// Create 3 threads to run foo.
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
foo();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
// Create 3 threads to run bar.
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
bar();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
Since synchronize locks on the class itself, only one thread can be in any synchronized method at any one point in time.  In this lies the confusion: It is natural to think that two different threads can be in two different synchronized methods at the same time. However, this is not the case. The output of the above code is unexpectedly as follows (well, for people like me):

I am in foo: 0 1 2 3 4 5 6 7 8 9 done with foo...
I am in bar: A B C D E F G H I J done with bar...
I am in bar: A B C D E F G H I J done with bar...
I am in bar: A B C D E F G H I J done with bar...
I am in foo: 0 1 2 3 4 5 6 7 8 9 done with foo...
I am in foo: 0 1 2 3 4 5 6 7 8 9 done with foo...
Fortunately, Java 1.5 introduced the concept of a lock.  This allows a specific section of code to be locked (by a given lock) and therefore defined as a critical section in isolation of other critical sections within the same class or object.  Therefore, given the following code:

public class StaticMethodSynchronisation {
static Lock FOOLOCK = new ReentrantLock();
static Lock BARLOCK = new ReentrantLock();
public static void foo() throws InterruptedException {
try {
FOOLOCK.lock();
System.out.print("I am in foo: ");
for (int i = 0; i < 10; i++) {
System.out.print(i + " ");
Thread.sleep(100);
}
System.out.println("done with foo...");
} finally {
FOOLOCK.unlock();
}
}
public static void bar() throws InterruptedException {
try {
BARLOCK.lock();
System.out.print("I am in bar: ");
for (int i = 0; i < 10; i++) {
System.out.print((char) (65 + i) + " ");
Thread.sleep(100);
}
System.out.println("done with bar...");
} finally {
BARLOCK.unlock();
}
}
public static void main(String[] args) {
// Create 3 threads to run foo.
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
foo();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
// Create 3 threads to run bar.
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
bar();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
Here, a thread can be in either method at any one point of time, giving the expected output:

I am in foo: 0 I am in bar: A 1 B 2 C 3 D 4 E 5 F 6 G 7 H 8 I 9 J done with foo...
I am in foo: 0 done with bar...
I am in bar: A 1 B 2 C 3 D 4 E 5 F 6 G 7 H I 8 9 J done with foo...
done with bar...
I am in foo: 0 I am in bar: A 1 B 2 C 3 D 4 E 5 F 6 G 7 H 8 I 9 J done with foo...
done with bar...
view raw LockOutput hosted with ❤ by GitHub
From the above, it is clear that foo() and bar() are executed simultaneously. However, multiple foo() methods are not executed at the same time (and the same for bar()). This can be confirmed since the text "I am in foo" will always be separated with "done with foo" (likewise with bar()).

Locks also allows multiple methods to be tied together in all kinds of arrangements. For example, if we write a method called fooFriend() and use FOOLOCK to lock it, only one thread can be in the critical sections of foo() and fooFriend() at the same time. This makes a Java lock a rather powerful construct.

The try-finally finally block is rather important and ensures that the lock is releases no matter what. A situation may arise where an exception is thrown inside the critical section and the code will go into a deadlock as the lock is still owned by the original thread.

Indeed synchronized can be a rather embarrassing monster when it rears it's ugly head.

Tuesday, January 15, 2013

Apache Camel

Good day all.  My first entry in this blog is all about Apache Camel for Java-based solutions.

Camel is a powerful open-source integration framework based on known Enterprise Integration Patterns. Camel define routing and mediation rules in a variety of domain-specific languages or even in XML.  It uses URIs to work directly with any kind of Transport or messaging model such as HTTP and JMS and also allows you to define your own.

What I particularly like is the small library with minimal dependencies which makes embedding the framework within a Java application seamless. Furthermore,  you work with the same API regardless of  the Transport used.  This makes a plug-and-play application where one endpoint can be swapped for another quite possible.

The demonstration application that I will discuss here is inspired by my new employer,  Mediswitch.  One of their products connects various medical aids and other third parties with all kinds of service providers (each using a different communication medium).  We will define two endpoints (a rather ambiguous term referring to either a URL/URI or an entire service) for our application:
  1. The first endpoint is an HTTP-GET client.  I like these in tests as they can be simply called from a web browser or using good old curl on the command line.
  2. A server using Thrift that services the request from the first endpoint and returns the result via the central switch.
Our application will connect these two endpoints.  Two important constructs within Camel is the Processor and the Component.  A Component models either a consumer (we get stuff in) or a producer (we sent stuff out) endpoint.  A Processor does something with the request and may sometimes also be used to create a component-like entity to communicate with an endpoint.  This is often preferred if the end-point is a once-off use.  Typically you will always write a Processor first and once it is required a second time, convert it into a Component.

For this application we will require the following:
  1. Component (already implemented for us) to handle the incoming HTTP-GET requests.
  2. Processor for each of the possible request that is received by the consumer component (i.e. the HTTP-GET side). For this application we will service four different requests:
    1. Getting a member's details.
    2. Modifying a member's detail.
    3. Deleting a member (I called it 'zap' for some reason).
    4. Process a claim submitted.
  3. Processor in between that does something additional with the request.
  4. A mechanism to wire all these things up and a main class that kicks off this process.
Let's start with the maven dependencies.  We need to include the core Camel dependency as well as something to handle the HTTP-GET.  One of the existing consumer based components is the Jetty Camel component.  Our pom file will therefore include the following two dependencies for Camel:

We do not need to write the Component, so next is the Processor to handle the Thrift call:

public class GetMemberDetailProcessor extends AbstractMedicalServiceProcessor {
@Override
public void process(Exchange exchange) throws Exception {
HttpServletRequest request = exchange.getIn().getBody(HttpServletRequest.class);
String id = request.getParameter("id");
if (id == null) {
throw new ServletException("id cannot be empty");
}
logger.debug("Request from {}, trying to retrieve member, [{}].", request.getRequestURL(), id);
ThriftMember member = getThriftClient().getMemberDetail(id);
exchange.getOut().setBody(String.format("Member, %s successfully retrieved", member.toString()));
}
}
The Thrift call is handled by the ThriftMember member = getThriftClient().getMemberDetail(id); call. The Thrift IDL and the detail on how Thrift works is beyond the scope of this article. All I can say is that it is one of the best communication framework that I have worked with and it is quite easy to get going. Our GetMemberDetailProcessor class overrides the public void process(Exchange exchange) throws Exception method from the Processor interface. I handle the Thrift-connection aspects in the inherited abstract class, which looks like this:

public abstract class AbstractMedicalServiceProcessor implements Processor {
private static final String HOST = "localhost";
private static final int PORT = 9000;
protected static Logger logger = LoggerFactory.getLogger(AbstractMedicalServiceProcessor.class);
private MedicalService.Client client;
public AbstractMedicalServiceProcessor() {
try {
logger.debug("Setting up Thrift client to query server on {}:{}.", HOST, PORT);
TTransport transport = new TFramedTransport(new TSocket(HOST, PORT));
TProtocol protocol = new TBinaryProtocol(transport);
client = new MedicalService.Client(protocol);
transport.open();
logger.debug("Thrift client sucessfully configured.");
} catch (TTransportException e) {
e.printStackTrace();
}
}
protected MedicalService.Client getThriftClient() {
return client;
}
}
The other Processor classes follow exactly the same pattern (and is therefore left as an exercise to you, the avid reader).  To test chaining (which I'll show later when we wire everything together), I've written a DummyProcessor class, which looks like:

public class DummyProcessor implements Processor {
protected static Logger logger = LoggerFactory.getLogger(DummyProcessor.class);
@Override
public void process(Exchange exchange) throws Exception {
logger.debug("********************************************************");
logger.debug("Detail of this request:");
logger.debug(exchange.getFromEndpoint().getEndpointUri());
logger.debug(exchange.getIn().getHeaders().toString());
logger.debug("********************************************************");
}
}
...and finally, all this needs to be wired up using the a subclass of the RouteBuilder class.  Camel uses (amongs others) Java code to implement a DSL (domain specific language). This is where the true power of the framework is evident.  There are two ways it can be wired up.  I've commented out one approach:

public class MedicalServiceRouteBuilder extends RouteBuilder {
@Override
public void configure() throws Exception {
from("jetty:http://localhost:8080?matchOnUriPrefix=true").process(new DummyProcessor()).choice()
.when(header("CamelHttpPath").isEqualTo("/modifymember")).process(new ModifyMemberProcessor())
.when(header("CamelHttpPath").isEqualTo("/zapmember")).process(new ZapMemberProcessor())
.when(header("CamelHttpPath").isEqualTo("/getmemberdetail")).process(new GetMemberDetailProcessor())
.when(header("CamelHttpPath").isEqualTo("/claim")).process(new ClaimProcessor());
/*
from("jetty:http://localhost:8080/modifymember").process(new DummyProcessor()).process(new ModifyMemberProcessor());
from("jetty:http://localhost:8080/zapmember").process(new DummyProcessor()).process(new ZapMemberProcessor());
from("jetty:http://localhost:8080/getmemberdetail").process(new DummyProcessor()).process(new GetMemberDetailProcessor());
from("jetty:http://localhost:8080/claim").process(new DummyProcessor()).process(new ClaimProcessor());
*/
}
}
In the first approach a single route is used where we route the request to the  DummyProcessor, followed by a choice(), which, based on the CamelHttpPath, routes the request to the correct Processor. The second approach sets up four different routes rather than one.  As far as I  understand, these two approaches are equivalent.  I prefer the first approach (in the same way that brown is my favourite colour).

To wire everything up, requires a CamelContext object, adding the routes and starting the context, which starts threads and internal processes to get Camel going:

public class CamelMedicalService {
public static void main(String[] args) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(new MedicalServiceRouteBuilder());
context.start();
// Wait a minute and then stop all (since context.start is non-blocking).
Thread.sleep(TimeUnit.MINUTES.toMillis(10));
context.stop();
}
}
Notice that the new-keyword above implies that Camel will work well with a dependency injection framework like Guice (if you don't know about it, you have to have a look!).

Now we can test the application.  After starting the Thrift server (a separate little application to allow testing of our central switch application), we can issue a command from either the browser or the command line.  Let's use a browser this time:


The log messages (after trimming down on the org.apache entries in the log4j.xml file), looks like this (notice the entries written by the DummyProcessor):

09:58:03,576 DEBUG [AbstractMedicalServiceProcessor] Setting up Thrift client to query server on localhost:9000.
09:58:03,585 DEBUG [AbstractMedicalServiceProcessor] Thrift client sucessfully configured.
09:58:03,585 DEBUG [AbstractMedicalServiceProcessor] Setting up Thrift client to query server on localhost:9000.
09:58:03,585 DEBUG [AbstractMedicalServiceProcessor] Thrift client sucessfully configured.
09:58:03,586 DEBUG [AbstractMedicalServiceProcessor] Setting up Thrift client to query server on localhost:9000.
09:58:03,586 DEBUG [AbstractMedicalServiceProcessor] Thrift client sucessfully configured.
09:58:03,587 DEBUG [AbstractMedicalServiceProcessor] Setting up Thrift client to query server on localhost:9000.
09:58:03,587 DEBUG [AbstractMedicalServiceProcessor] Thrift client sucessfully configured.
09:58:07,992 DEBUG [DummyProcessor] ********************************************************
09:58:07,992 DEBUG [DummyProcessor] Detail of this request:
09:58:07,992 DEBUG [DummyProcessor] http://localhost:8080?matchOnUriPrefix=true
09:58:07,992 DEBUG [DummyProcessor] {id=123, Host=localhost:8080, Content-Type=null, CamelHttpPath=/getmemberdetail, CamelHttpMethod=GET, CamelHttpServletResponse=HTTP/1.1 200
, CamelHttpServletRequest=[GET /getmemberdetail?id=123]@1767965669 org.eclipse.jetty.server.Request@696103e5, CamelHttpQuery=id=123, Accept=*/*, CamelHttpUrl=http://localhost:8080/getmemberdetail, User-Agent=curl/7.27.0, CamelHttpUri=/getmemberdetail}
09:58:07,992 DEBUG [DummyProcessor] ********************************************************
09:58:07,996 DEBUG [AbstractMedicalServiceProcessor] Request from http://localhost:8080/getmemberdetail, trying to retrieve member, [123].
09:58:26,076 DEBUG [DummyProcessor] ********************************************************
09:58:26,076 DEBUG [DummyProcessor] Detail of this request:
09:58:26,076 DEBUG [DummyProcessor] http://localhost:8080?matchOnUriPrefix=true
09:58:26,076 DEBUG [DummyProcessor] {Content-Type=null, id=123, CamelHttpServletRequest=[GET /claim?id=123&amount=200]@847205607 org.eclipse.jetty.server.Request@327f54e7, CamelHttpUrl=http://localhost:8080/claim, CamelHttpServletResponse=HTTP/1.1 200
, User-Agent=curl/7.27.0, CamelHttpPath=/claim, CamelHttpMethod=GET, CamelHttpUri=/claim, amount=200, Accept=*/*, CamelHttpQuery=id=123&amount=200, Host=localhost:8080}
09:58:26,076 DEBUG [DummyProcessor] ********************************************************
09:58:26,077 DEBUG [AbstractMedicalServiceProcessor] Request from http://localhost:8080/claim, trying to claim for member, [id=123, amount=200.0].
view raw camel.log hosted with ❤ by GitHub
There are a number of errors that I got while writing this application.  It was usually related to either an incorrect dependency/version or an incorrect route being used.

In summary, Camel makes routing between unknown endpoints as easy as possible.  Notice how the internal Jetty boilerplate code is completely absent from the code.  It is all handled internally and setting it up essentially required a single line of code.  The chaining between Processors also makes it easy to intercept a request for some pre-processing before passing it on.