Mittwoch, 22. August 2012

Getting rid of synchronized: Using Akka from Java

I've been giving an internal talk on Akka, the Actor framework for the JVM, at my former company synyx. For the talk I implemented a small example application, kind of a web crawler, using Akka. I published the source code on Github and will explain some of the concepts in this post.

Motivation

To see why you might need something like Akka, think you want to implement a simple web crawler for offline search. You are downloading pages from a certain location, parse and index the content and follow any links that you haven't parsed and indexed yet. I am using HtmlParser for downloading and parsing pages and Lucene for indexing them. The logic is contained in two service objects, PageRetriever and Indexer, that can be used from our main application.

A simple sequential execution might then look something like this:

public void downloadAndIndex(String path, IndexWriter writer) {
    VisitedPageStore pageStore = new VisitedPageStore();
    pageStore.add(path);
        
    Indexer indexer = new IndexerImpl(writer);
    PageRetriever retriever = new HtmlParserPageRetriever(path);
        
    String page;
    while ((page = pageStore.getNext()) != null) {
        PageContent pageContent = retriever.fetchPageContent(page);
        pageStore.addAll(pageContent.getLinksToFollow());
        indexer.index(pageContent);
        pageStore.finished(page);
    }
        
    indexer.commit();
}

We are starting with one page, extract the content and the links, index the content and store all links that are to be visited in the VisitedPageStore. This class contains the logic to determine which links are visited already. We are looping as long as there are more links to follow, once we are done we commit the Lucene IndexWriter.

This implementation works fine, when running on my outdated laptop it will finish in around 3 seconds for an example page. (Note that the times I am giving are by no means meant as a benchmark but are just there to give you some idea on the numbers).

So we are done? No, of course we can do better by optimizing the resources we have available. Let's try to improve this solution by splitting it into several tasks that can be executed in parallel.

Shared State Concurrency

The normal way in Java would be to implement several Threads that do parts of the work and access the state via guarded blocks, e.g. by synchronizing methods. So in our case there might be several Threads that access our global state that is stored in the VisitedPageStore.

This model is what Venkat Subramaniam calls Synchronize and Suffer in his great book Programming Concurrency on the JVM. Working with Threads and building correct solutions might not seem that hard at first but is inherintly difficult. I like those two tweets that illustrate the problem:

Brian Goetz of course being the author of the de-facto standard book on the new Java concurrency features, Java Concurrency in Practice.

Akka

So what is Akka? It's an Actor framework for the JVM that is implemented in Scala but that is something that you rarely notice when working from Java. It offers a nice Java API that provides most of the functionality in a convenient way.

Actors are a concept that was introduced in the seventies but became widely known as one of the core features of Erlang, a language to build fault tolerant, self healing systems. Actors employ the concept of Message Passing Concurrency. That means that Actors only communicate by means of messages that are passed into an Actors mailbox. Actors can contain state that they shield from the rest of the system. The only way to change the state is by passing in messages. Each Actor is executed in a different Thread but they provide a higher level of abstraction than working with Threads directly.

When implementing Actors you put the behaviour in a method receive() that can act on incoming messages. You can then reply asynchronously to the sender or send messages to any other Actor.

For our problem at hand an Actor setup might look something like this:

There is one Master Actor that also contains the global state. It sends a message to fetch a certain page to a PageParsingActor that asynchonously responds to the Master with the PageContent. The Master can then send the PageContent to an IndexingActor which responds with another message. With this setup we have done a first step to scale our solution. There are now three Actors that can be run on different cores of your machine.

Actors are instantiated from other Actors. On the top there's the ActorSystem that is provided by the framework. The MasterActor is instaciated from the ActorSystem:

ActorSystem actorSystem = ActorSystem.create();
final CountDownLatch countDownLatch = new CountDownLatch(1);
ActorRef master = actorSystem.actorOf(new Props(new UntypedActorFactory() {

    @Override
    public Actor create() {
        return new SimpleActorMaster(new HtmlParserPageRetriever(path), writer, countDownLatch);
    }
}));

master.tell(path);
try {
    countDownLatch.await();
    actorSystem.shutdown();
} catch (InterruptedException ex) {
    throw new IllegalStateException(ex);
}

Ignore the CountdownLatch as it is only included to make it possible to terminate the application. Note that we are not referencing an instance of our class but an ActorRef, a reference to an actor. You will see later why this is important.

The MasterActor contains references to the other Actors and creates them from its context. This makes the two Actors children of the Master:

public SimpleActorMaster(final PageRetriever pageRetriever, final IndexWriter indexWriter,
    final CountDownLatch latch) {

    super(latch);
    this.indexer = getContext().actorOf(new Props(new UntypedActorFactory() {

        @Override
        public Actor create() {

            return new IndexingActor(new IndexerImpl(indexWriter));
        }
    }));

    this.parser = getContext().actorOf(new Props(new UntypedActorFactory() {

        @Override
        public Actor create() {

           return new PageParsingActor(pageRetriever);
        }
    }));
}

The PageParsingActor acts on messages to fetch pages and sends a message with the result to the sender:

public void onReceive(Object o) throws Exception {
    if (o instanceof String) {
        PageContent content = pageRetriever.fetchPageContent((String) o);
        getSender().tell(content, getSelf());
    } else {
        // fail on any message we don't expect
        unhandled(o);
    }
}

The IndexingActor contains some state with the Indexer. It acts on messages to index pages and to commit the indexing process.

public void onReceive(Object o) throws Exception {
    if (o instanceof PageContent) {
        PageContent content = (PageContent) o;
        indexer.index(content);
        getSender().tell(new IndexedMessage(content.getPath()), getSelf());
    } else if (COMMIT_MESSAGE == o) {
        indexer.commit();
        getSender().tell(COMMITTED_MESSAGE, getSelf());
    } else {
        unhandled(o);
    }
}

The MasterActor finally orchestrates the other Actors in its receive() method. It starts with one page and sends it to the PageParsingActor. It keeps the valuable state of the application in the VisitedPageStore. When no more pages are to be fetched and indexed it sends a commit message and terminates the application.

public void onReceive(Object message) throws Exception {

    if (message instanceof String) {
        // start
        String start = (String) message;
        visitedPageStore.add(start);
        getParser().tell(visitedPageStore.getNext(), getSelf());
    } else if (message instanceof PageContent) {
        PageContent content = (PageContent) message;
        getIndexer().tell(content, getSelf());
        visitedPageStore.addAll(content.getLinksToFollow());

        if (visitedPageStore.isFinished()) {
            getIndexer().tell(IndexingActor.COMMIT_MESSAGE, getSelf());
        } else {
            for (String page : visitedPageStore.getNextBatch()) {
                getParser().tell(page, getSelf());
            }
        }
    } else if (message instanceof IndexedMessage) {
        IndexedMessage indexedMessage = (IndexedMessage) message;
        visitedPageStore.finished(indexedMessage.path);

        if (visitedPageStore.isFinished()) {
            getIndexer().tell(IndexingActor.COMMIT_MESSAGE, getSelf());
        }
    } else if (message == IndexingActor.COMMITTED_MESSAGE) {
        logger.info("Shutting down, finished");
        getContext().system().shutdown();
        countDownLatch.countDown();
    }
}

What happens if we run this example? Unfortunately it now takes around 3.5 seconds on my dual core machine. Though we are now able to run on both cores we have actually decreased the speed of the application. This is probably an important lesson. When building scalable applications it might happen that you are introducing some overhead that decreases the performance when running in the small. Scalability is not about increasing performance but about the ability to distribute the load.

So it was an failure to switch to Akka? Not at all. It turns out that most of the time the application is fetching and parsing pages. This includes waiting for the network. Indexing in Lucene is blazing fast and the Master mostly only dispatches messages. So what can we do about it? We already have split our application into smaller chunks. Fortunately the PageParsingActor doesn't contain any state at all. That means we can easily parallelize its tasks.

This is where the talking to references is important. For an Actor it's transparent if there is one or a million Actors behind a reference. There is one mailbox for an Actor reference that can dispatch the messages to any amount of Actors.

We only need to change the instanciation of the Actor, the rest of the application remains the same:

parser = getContext().actorOf(new Props(new UntypedActorFactory() {

        @Override
        public Actor create() {

            return new PageParsingActor(pageRetriever);
        }
}).withRouter(new RoundRobinRouter(10)));

By using a router the Akka framework automatically takes care that there are 10 Actors available. The messages are distributed to any available Actor. This takes the runtime down to 2 seconds.

A word on Blocking

Note that the way I am doing network requests here is not recommended in Akka. HTMLParser is doing blocking networking which should be carefully reconsidered when designing a reactive system. In fact, as this application is highly network bound, we might even gain more benefit by just using an asynchronous networking library. But hey, then I wouldn't be able to tell you how nice it is to use Akka. In a future post I will highlight some more Akka features that can help to make our application more robust and fault tolerant.