Wednesday, March 30, 2016

How to Integrating Cassandra with Hadoop ?

Hadoop is a set of open source projects that deal with large amounts of data in a distributed way.


Note: The ''word count'' example given in this section is also found in the Cassandra source download in its contrib module. It can be compiled and run using instructions found there.



Cassandra has a Java source package for Hadoop integration code, called org.apache.cassandra.hadoop

Running the Word Count Example

It takes a body of text and counts the occurrences of each distinct word. Here we provide some code to perform a word count over data contained in Cassandra.


The Mapper class
IColumn column = columns.get(columnName.getBytes());
String value = new String(column.value());
StringTokenizer itr = new StringTokenizer(value);
while (itr.hasMoreTokens()) {
 word.set(itr.nextToken()); // Text word
 context.write(word, one); // IntWritable one
}

The Reducer Class
int sum = 0;
for (IntWritable val : values) {
 sum += val.get();
}
result.set(sum);
context.write(key, result);

Our MapReduce Program
Job job = new Job(getConf(), “wordcount”);
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setInputFormatClass(ColumnFamilyInputFormat.class);

FileOutputFormat.setOutputPath(job, new Path(“/tmp/word_count”));

ConfigHelper.setThriftContact(job.getConfiguration(), “localhost”, 9160);
ConfigHelper.setInputColumnFamily(job.getConfiguration(), “Keyspace1”, “Standard1”);
SlicePredicate predicate = new    SlicePredicate().setColumn_names(Arrays.asList(columnName.getBytes()));
ConfigHelper.setInputSlicePredicate(job.getConfiguration(), predicate);
job.waitForCompletion(true);
return 0;

Outputting Data to Cassandra
For updates on the built-in output format, see http://wiki.apache.org/cassandra/HadoopSupport

It is possible, however, to write directly to Cassandra via Thrift (or a higher-level client) in the Reducer step. In the previous example, this means that instead of writing to the context, one could write its key and value to Cassandra directly.

Tools Above MapReduce
MapReduce is a great abstraction for developers so that they can worry less about the details of distributed computing and more about the problems they are trying to solve. Over time, an even more abstracted toolset has emerged. Pig and Hive operate at a level above MapReduce and allow developers to perform more complex analytics more easily. Both of these frameworks can operate against data in Cassandra.

Pig
Pig is a platform for data analytics developed at Yahoo!.
Included in the platform is a high-level language called Pig Latin and a compiler that translates programs written in Pig Latin into sequences of MapReduce jobs.

To write our word count example using Pig Latin:
LOAD 'cassandra://Keyspace1/Standard1' USING CassandraStorage() \
as (key:chararray, cols:bag{col:tuple(name:bytearray, value:bytearray)});
cols = FOREACH rows GENERATE flatten(cols) as (name, value);
words = FOREACH cols GENERATE flatten(TOKENIZE((chararray) value)) as word;
grouped = GROUP words BY word;
counts = FOREACH grouped GENERATE group, COUNT(words) as count;
ordered = ORDER counts BY count DESC;
topten = LIMIT ordered 10;
dump topten;

Line 1 gets all the data in the Standard1 column family, describing that data with aliases and data types. We extract the name/value pairs in each of the rows. In line 3, we have to cast the value to a character array in order to tokenize it with the built-in TOKENIZE function. We next group by and count each word instance. Finally, we order our data by count and output the top 10 words found.

Pig provides an abstraction that makes our code significantly more concise. Pig also allows programmers to express operations such as joins much more simply than by using MapReduce alone.

Hive
Like Pig, Hive is a platform for data analytics. Instead of a scripting language, queries are written in a query language similar to the familiar SQL called Hive-QL. Hive was developed by Facebook to allow large data sets to be abstracted into a common structure.

Cluster Configuration: Cassandra & Hadoop
Because Hadoop has some unfamiliar terminology, here are some useful definitions:
HDFS
Hadoop distributed filesystem.
Namenode
The master node for HDFS. It has locations of data blocks stored in several datanodes and often runs on the same server as the jobtracker in smaller clusters.
Datanode
Nodes for storing data blocks for HDFS. Datanodes run on the same servers as tasktrackers.
Jobtracker
The master process for scheduling MapReduce jobs. The jobtracker accepts new jobs, breaks them into map and reduce tasks, and assigns those tasks to tasktrackers in the cluster. It is responsible for job completion. It often runs on the same server as the namenode in smaller clusters.
Tasktracker
The process responsible for running map or reduce tasks from the jobtracker. Tasktrackers run on the same servers as datanodes.


Like Cassandra, Hadoop is a distributed system. The MapReduce jobtracker spreads tasks across the cluster, preferably near the data it needs. When a jobtracker initiates tasks, it looks to HDFS to provide it with information about where that data is stored. Similarly, Cassandra’s built-in Hadoop integration provides the jobtracker with data locality information so that tasks can be close to the data. In order to achieve this data locality, Cassandra nodes must also be part of a Hadoop cluster. The namenode and jobtracker can reside on a server outside your Cassandra cluster. Cassandra nodes will need to be part of the cluster by running a tasktracker process on each node. Then, when a MapReduce job is initiated, the jobtracker can query Cassandra for locality of the data when it splits up the map and reduce tasks.


A four-node Cassandra cluster with tasktracker processes running on each Cassandra node is shown in Figure 12-1. At least one node in the cluster needs to be running the datanode process. There is a light dependency on HDFS for small amounts of data (the distributed cache), and a single datanode should suffice. External to the cluster is the server running the Hadoop namenode and jobtracker.






When a job is initiated from a client machine, it goes to the jobtracker. The jobtracker receives information about the data source from when the job is submitted, via the configuration options mentioned earlier. At that point, it can use Cassandra’s Column FamilyRecordReader and ColumnFamilySplit to get the physical locations of different segments of data in the cluster. Then, it can use that location to split up the tasks among the nodes in the cluster, preferring to run tasks on nodes where the associated data resides.


Finally, when creating jobs for MapReduce to execute (either directly or via Pig), the Hadoop configuration needs to point to the namenode/jobtracker (in the Hadoop configuration files) and the Cassandra configuration options. The cluster will be able to handle the integration from there.


Note: As far as configuration, Keith Thornhill from Raptr.com has a separate namenode/jobtracker and installed the datanode/tasktracker on each of his Cassandra nodes. He notes that a nice side effect of this is that the analytics engine scales with the data.



In this chapter, we examined Hadoop, the open source implementation of the Google MapReduce algorithm. We took a brief look at Hadoop basics and how to integrate it with Cassandra, and
also saw how to use Pig, a simple and concise language to express MapReduce jobs.

No comments:

Post a Comment