Performing analytics on Big Data is a hot topic these days. Many organizations realized that they can gain valuable insight from the data that flows through their systems, both in real time and in researching historical data. Imagine what Google, Facebook or Twitter can learn from the data that flows through their systems. And indeed the boom of real time analytics is here: Google Analytics, Facebook Social Analytics, Twitter paid tweet analytics and many others, including start-ups that specialize in real time analytics.
But the challenge of analyzing such huge volumes of data is enormous. Mining Terabytes and Petabytes of data is no simple task, one that traditional databases cannot meet, which drove the invention of new NoSQL database technologies such as Hadoop, Cassandra and MongoDB. Analyzing data in real time is yet another challenging task, when dealing with very high throughputs. For example, according the Twitter’s statistics, the number of tweets sent on March 11 2011 was 177 million! Now, analyzing this stream, that’s a challenge!
Standing up to the challenge
When discussing the challenge of real time analytics on Big Data, I oftentimes use the Twitter use case as an example to illustrate the challenges. People find it easy to relate to this use case and appreciate the volumes and challenges of such a system. A few weeks ago, when planning a workshop on real time analytics for Big Data, I was challenged to take up this use case and design a proof of concept that meets up to the challenge of analyzing real tweets from real time feeds. Well, challenge is my name, and solution architecture is my game…
Note that it is by no means a complete solution, but more of a thought exercise, and I’d like to share these thoughts with you, as well as the code itself, which is shared on GitHub (see at the bottom of the post). I hope this will only be the starting point of a joint community effort to make it into a complete reference example. So let’s have a look at what I sketched up.
What kind of analytics?
First of all, we need to see what kinds of analytics are of interest in such use case. Looking at Twitter analytics, we see various types of analytics that I group in 3 categories:
Counting: real-time counting analytics such as how many requests per day, how many sign-ups, how many times a certain word appears, etc.
Correlation: near-real-time analytics such as desktop vs. mobile users, which devices fail at the same time, etc.
Research: more in-depth analytics that run in batch mode on the historical data such as what features get re-tweeted, detecting sentiments, etc.
When approaching the architecture of a solution that covers all of the above types of analytics, we need to realize the different nature of the real time vs. historical analysis, and leverage on appropriate technologies to meet each challenge on its own ground, and then combine the technologies into a single harmonious solution. I’ll get back to that point…
In my sample application I wanted to listen on the public timeline of Twitter, perform some sample real-time analytics of word-counting, as well as preparing the data for research analytics, and combining it into a single solution to handle all aspects.
Feeding in a hefty stream of tweets
I chose to listen on the Twitter public timeline (so I can share the application with you without having to give you my user/password).
For integration with Twitter I chose to use Spring Social, which offers a built-in Twitter connector integrating with Twitter’s REST API. I wanted to integrate with Twitter’s Streaming API, but unfortunately it appears that currently Spring Social does not support this API, so I settled for repetitive calls to the regular API.
A feeder took in the tweets and converted them in an ETL style to a canonical Document-oriented representation, which semi-structured nature makes it ideal for the evolving nature of tweet structure, and wrote them to an in-memory data grid (IMDG) on the server side.
The design needs to accommodate a very high throughput of around 10k tweets/sec, with latency of milliseconds. For that end I chose to implement the feeder as an independent processing unit in GigaSpaces XAP, so that I can cope with the write scalability requirement by simply adding more parallel feeders to handle the stream. Since the feeder is a stateless service, scaling out by adding instances is relatively easy. Trying to do the same with stateful services will prove to be much more challenging, as we’re about to find out …
Let’s pick the brains of my accumulated tweets
On the server side, I wanted to store the tweets and prepare them for batch-mode historical research. For the same reason of semi-structured data, I also chose a Document-oriented database to store the Tweets. In this case, I chose the open source Apache Cassandra, which has become a prominent NoSQL database that is in use by Twitter itself, as well as by many other companies. The API to interact with Cassandra is Hector.
To avoid tight coupling of my application code with Cassandra, I followed the Inversion of Control principle (IoC) and created an abstraction layer for the persistent store, where Cassandra is just one implementation, and provided another implementation for testing purposes of persistence to the local file system. Leveraging on Spring Framework wiring capabilities (see below), switching between implementations becomes a configuration choice, with no code changes.
For easy configuration and wiring I used Spring Framework, leveraging on its wiring capabilities as well as properties injection and parameter configuration. Using these features I made the application highly configurable, exposing the Twitter connection details, buffer sizes, thread pool sizes, etc. This means that the application can be tuned to any load, depending on the hardware, network and JVM specifications.
What can I learn from the tweet stream on the fly?
In addition to persisting the data, I also wanted to perform some sample on-the-fly real-time analytics on the data. For this experiment I chose to perform word counting (or more exactly token counting, as token can also be an expression or a combination of symbols).
At first glance you may think it’s a simple task to implement, but when facing the volumes and throughput of Twitter you’ll quickly realize that we need a highly scalable and distributed architecture that uses an appropriate technology and that takes into account data sharding and consistency, data model de-normalization, processing reliability, message locality, throughput balancing to avoid backlog build-up etc.
I chose to meet these challenges by employing Event-Driven Architecture (EDA) and designed a workflow to run through the different stages of the processing (parsing, filtering, persisting, local counting, global aggregation, etc.) where each stage of the workflow is a processor. To meet the above challenges of throughput, backlog build-up, distribution etc., I designed the processor with the following characteristics:
- Each processor has a thread pool (of a configurable size) to enable concurrent processing.
- Each processor thread can process events in batches (of a configurable size) to balance between input and output streams and avoid backlog build-up.
- Processors are co-located with the (sharded) data, so that most of the data processing is performed locally, within the same JVM, avoiding distributed transactions, network, and serialization overhead.
The overall workflow looks as follows:
For the implementation of the workflow and the processors I chose XAP Polling Container, which runs inside the in-memory data grid co-located with the data and enables easy implementation of the above characteristics.
The events that drive the workflow are simple POJOs on which I listen and which state changes trigger the events. This is a very useful characteristic of the XAP platform, which saved me the need to generate message objects and place them in a message broker.
For the implementation of atomic counters I used XAP’s MAP API, which allows using the in-memory data grid as a transactional key-value store, where the key is the token and the value is the count, and each such entry can be locked individually to achieve atomic updates, very similarly to ConcurrentHashMap.
Making it all play together in harmony
So we have a deployment that incorporates a feeder, a processor and a Cassandra persistent store, each such service having multiple instances and needing to scale in/out dynamically based on demand. Designing my solution for the real deal, I’m about to face 10s-100s of instances of each service. Manual deployment or scripting will not be manageable, not to mention the automatic scaling of each service, monitoring and troubleshooting. How do I manage that automatically as a single cohesive solution?
For that I used GigaSpaces Cloudify, which allows me to integrate any stack of services by writing Groovy-based recipes describing declaratively the lifecycle of the application and its services.
I can then deploy and manage the end-to-end application using the CLI and the Web Console.
This was a thought exercise on real-time analytics for big data. I used Twitter use case because I wanted to aim high up the big data challenge and, well, you can’t get much bigger than that.
The end-to-end solution included a clustered Cassandra NoSQL database for the elaborated batch analytics of the historical data, GigaSpaces XAP platform for distributed In-Memory Data Grid with co-located with real-time processing, Spring Social for feeding in Tweets from twitter, Spring Framework for configuration and wiring capabilities, and GigaSpaces Cloudify for deployment, management and monitoring. I used event-driven architecture with semi-structured Documents, POJOs and atomic counters, and with write-behind eviction.
This is just the beginning. My design hardly utilized the capabilities of the chosen technology stack, and it barely scratched the surface of the analytics you can perform on Twitter. Imagine for example what it would take to calculate not just real-time word counts but also the reach of tweets, as done on the tweetreach service.
This project is just the starting point, and I would like to share this project with you and invite you to stand up to the challenge with me and together make it into a complete reference solution for real-time analytics architecture for big data.
The project is found on GitHub under https://github.com/dotanh/rt-analytics.
You’re welcome to contribute!
Follow Dotan on Twitter!