Tuesday, October 26, 2010

Planet Hadoop...!!!

Hadoop

Hadoop is an open source Java platform. It lets one easilt write and run distributed applications on large compter clusters to process vast amounts of data. It implements HDFS [Hadoop Distributed File System].

HDFS:

HDFS is a distributed filesystem that runs on large cluster of commodity machines. HDFS divides applications into many blocks of work. HDFS creates multiple replicas of data blocks for reliability, placing them on compute nodes around the cluster. Mapreduce can then process the data where it is located. HDFS has a Master / Slave architecture. A HDFS cluster consists of a single NameNode- a master server that manages the file system namespaces & regulates access to files by clients. In addition there are no. of DataNodes, usually one per node in the cluster, which manage storage attached to the modes that they run on. HDFS exposes a file system Namespace & allows user data to be stored in files. Internally a file is split into one or more blocks & these blocks are stored in a set of DataNodes. The NameNode executes file system Namespace operations like opening, closing & renaming files & directories. It also determines mapping of blocks to DataNodes. The DataNodes are responsible for serving read & write requests from the file system's clients. The DataNodes also perform block creation, deletion & replication upon instruction from the NameNode.


MapReduce:
 
It is breaking a problem into independent pieces to be worked on in parallel. MapReduce is an abstraction that allows Google's engineers to perform simple computations, while hiding the details of parallelisation, data distribution, load balancing & fault tolerance.

MapReduce as a Programming Model:

MAP: it is written by a user of the MapReduce library, takes on input pair & produces a set of intermediate key/value pairs. The MapReduce library groups together all intermediate values associated with the same intermediate key and passes them to the Reduce function.

REDUCE: it is also written by the user, it accepts intermediate key & a set of values for that key. It merges together these values to form a possibly smaller set of values.
The MapReduce framework consists of a single master Jobtracker and on slave Tasktracker per cluster node. The master is responsible for scheduling the job's component tasks on the slaves, monitoring them and re-executing the failed tasks. The slaves execute the tasks as directed by the master.
The Hadoop job client then submits the job (jar / executables) & configuraions to the Jobtracker which then assumes the responsibility of distributing the software / configuration to the slaves, scheduling tasks & monitoring them, providing status & diagnostic information to the job client.

PIG : A dataflow language and execution environment for exploring very large datasets. Pig runs on HDFS and MapReduce clusters.

HBASE : A distributed column oriented database. Hbase uses HDFS for its underlying storage and supports batch-style computations using MapReduce & point queries.

Zookeepers : A distributed, Highly available coordination service. Zookeeper provides primitives such as distributed locks that can be used for building distributed applications.

Hive : A distributed data warehouse. Hive manages data stored in HDFS & provides a query language based on SQL for querying the data.

Chukwa : A distributed data collection and analysis system. Chukwa runs collectors that store data in HDFS and it uses MapReduce to produce reports.


Hadoop Default Ports :

1) HDFS : Namenode 50070 dfs.http.address
                 Datanode 50075 dfs.datanode.http.addresss
                 secondary Namenode 50090 dfs.secondary.http.address
                 backup/checkpoint node 50105
2) MR :     JobTracker 50030 mapred.job.tracker.http.address
                 Tasktracker 50060 mapred.task.tracker.http.address



Hadoop Installation On Linux:

Installing Java:

before installing hadoop on linux we need to install java 6 or above on linux.
It is available at http://java.sun.com/products/archive/j2se/6u12
I am using jdk-6u12-linux-i586.bin
you can choose the version 32 bit or 64 bit as per your configuration and o/s.
I am currently using redhat linux 5 (32 bit).

Download the java package then allow permissions i.e.
# chmod 744 jdk-6u12-linux-i586.bin
# ./jdk-6u12-linux-i586.bin


After that set $JAVA_HOME path i.e.
# export JAVA_HOME=<build tool directory>/jdk1.6.0_12

we also need some necessary files-
download jce_policy-6.zip
extract it using...
#unzip jce_policy-6.zip
# cp -f jce/*.jar $JAVA_HOME/jre/lib/security/


# chmod 744 $JAVA_HOME/jre/lib/security/*.jar

Installing Hadoop:

Download hadoop-0.20.2.tar.gz file from
http://www.higherpass.com/linux/

or give command-
# wget http://www.gtlib.gatech.edu/pub/apache/hadoop/core/hadoop-0.20.2

After downloading of tar file we need to extract it, for that

# tar -xvzf hadoop-0.20.2.tar.gz

then copy that hadoop installation folder to '/usr/local' location-
# cp -r hadoop-0.20.2/ /usr/local

Hadoop Setup :

Setup HADOOP_HOme environment cariable to the install directory & append $HADOOP_HOME/bin to PATH environment variable.

# export HADOOP_HOME=/usr/local/hadoop-0.20.2
# export PATH=$PATH:$HADOOP_HOME/bin


Now configure JAVA_HOME path at the '/usr/local/hadoop-0.20.2/conf/hadoop-env.sh' file.

Hadoop Pseudo Distributed Cluster:

1) edit /conf/core-site.xml and make changes as...

<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:9000</value>
</property>
</configuration>

2) edit /conf/hdfs-site.xml and make changes as...

<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

3) edit /conf/mapred-site.xml and make changes as...

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>localhost:9001</value>
</property>
</configuration>

After that setup password less ssh with keys. Hadoop communicates over 'ssh' so we need to setup ssh keys, Using ssh Agents & tunnels.

# ssh-keygen -t dsa -p ' ' -f ~/.ssh/id_dsa

# cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys



Format the HDFS file system to prepare it for use. This creates the needed directory structures for the HDFS filesystem.

# hadoop namenode -format

now everything is configured & setup start the daemons for that give command-

# start-all.sh


Running A Job on Cluster Mode:

1)create a simple file say “test.txt”

# vim test.txt
hello this is a sample file for hadoop
hello this file contains simple data.

Save this file and exit

2) Insert this file into hdfs
# hadoop dfs -put test.txt /

The wordcount program is present at '/usr/local/hadoop-0.20.2/src/examples/org/apache/hadoop/examples/WordCount.java'

The Program WordCount.java is:

package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{


private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}



public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: wordcount <in> <out>");
System.exit(2);
}
Job job = new Job(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}



And the jar file is present at '/usr/local/hadoop-0.20.2/hadoop-0.20.2-examples.jar'

To run program give command :

# hadoop jar hadoop-0.20.2-examples.jar wordcount /test.txt /out

You will get following output on screen

10/10/27 10:26:16 INFO input.FileInputFormat: Total input paths to process : 1
10/10/27 10:26:16 INFO mapred.JobClient: Running job: job_201010270916_0005
10/10/27 10:26:17 INFO mapred.JobClient: map 0% reduce 0%
10/10/27 10:26:27 INFO mapred.JobClient: map 100% reduce 0%
10/10/27 10:26:39 INFO mapred.JobClient: map 100% reduce 100%
10/10/27 10:26:41 INFO mapred.JobClient: Job complete: job_201010270916_0005
10/10/27 10:26:41 INFO mapred.JobClient: Counters: 17
10/10/27 10:26:41 INFO mapred.JobClient: Job Counters
10/10/27 10:26:41 INFO mapred.JobClient: Launched reduce tasks=1
10/10/27 10:26:41 INFO mapred.JobClient: Launched map tasks=1
10/10/27 10:26:41 INFO mapred.JobClient: Data-local map tasks=1
10/10/27 10:26:41 INFO mapred.JobClient: FileSystemCounters
10/10/27 10:26:41 INFO mapred.JobClient: FILE_BYTES_READ=50
10/10/27 10:26:41 INFO mapred.JobClient: HDFS_BYTES_READ=20
10/10/27 10:26:41 INFO mapred.JobClient: FILE_BYTES_WRITTEN=132
10/10/27 10:26:41 INFO mapred.JobClient: HDFS_BYTES_WRITTEN=28
10/10/27 10:26:41 INFO mapred.JobClient: Map-Reduce Framework
10/10/27 10:26:41 INFO mapred.JobClient: Reduce input groups=4
10/10/27 10:26:41 INFO mapred.JobClient: Combine output records=4
10/10/27 10:26:41 INFO mapred.JobClient: Map input records=1
10/10/27 10:26:41 INFO mapred.JobClient: Reduce shuffle bytes=50
10/10/27 10:26:41 INFO mapred.JobClient: Reduce output records=4
10/10/27 10:26:41 INFO mapred.JobClient: Spilled Records=8
10/10/27 10:26:41 INFO mapred.JobClient: Map output bytes=36
10/10/27 10:26:41 INFO mapred.JobClient: Combine input records=4
10/10/27 10:26:41 INFO mapred.JobClient: Map output records=4
10/10/27 10:26:41 INFO mapred.JobClient: Reduce input records=4



the output will be stored at /out/part-00000 file, provided the output folder should not be there before running the program. Otherwise you will get an error.

To remove file from HDFS give command:
# hadoop dfs -rm /test.txt

to remove contents of entire directory recursively give command;
# hadoop dfs -rmr    /

To check the output give command:
# hadoop dfs -cat /out/part-00000    or    # hadoop dfs -cat /out/part-r-00000

you get following data stored in it...
a 1
contains 1
data. 1
file 2
for 1
hadoop 1
hello 2
is 1
sample 1
simple 1
this 2



To check contents of HDFS file system give command:

# hadoop dfs -ls

Explaination of WordCount.java :

The program needs 3 classes to run: a Mapper, a Reducer and a Driver. The Driver tells Hadoop how to run the MapReduce process. The Mapper and Reducer operate on data.

1) Mapping List : The first phase of MapReduce program is 'Mapping'. A list of data elements are provided, one at a time to a function called, ' Mapper', which transforms each element individually to an output data element.

2) Reducing List : Reducing here means aggregating value together and returns a single output.

3) Keys and Values : the mapping & reducing functions receive not just values, but (key, value) pairs. The default input format used by Hadoop presents each line of an Input file as a separate input to the Mapper function and not the entire file at a time.

4) StringTokenizer object : used to break up the line into words.

5) Output.collect() : this method will copy the values it receives as input, so we are free to overwrite variables we use.

6) Driver Method : there is one final component of a Hadoop MapReduce program called, ' Driver'. The Driver initiates the job & instructs the Hadoop platform to execute your code on a set of input files & controls where the output files are placed.

7) InputPath Argument : given input directory.

8) OutputPath Argument : Directory in which output from reducers are written into files.

9) JobClient object : it captures the configuration information to run job. Mapping and Reducing functions are identified by setMapperClass() & setreducerClass() methods.
Data types emitted by the reducer are identified by setOutputKeyClass() and setOutputValueClass(). If this is not the case, the methods of the JobConf class will override these. The input types fed to the Mapper are controlled by the InputFormat used. The default Input Format, “TextInputFormat” will load data in as (LongWritable, Text) pairs. The Long value is byte offset of the line in file. The text object holds the string contents of the line of the file.
The text object holds the string contents of the line of the file. The call to JobClient.runJob(conf) will submit the job to Mapreduce. This will block until the job completes. If the job fails, it will throw an IOException. JobClient also provides a non-blocking version called, 'submitJob()'.

10) InputFormat : it is a class that provides following functionality:-
     a) Selects the files or other objects that should be used for input.
     b) Defines the InputSplits that break a file into tasks.
     c) Provides a factory for RecordReader Object that read the file.
         e.g. FileInputFormat : it is provided with a path containing files to read. The FileInputFormat will read all files in this directory. Then divides these files into one or more InputSplit each. We can choose which InputFormat to apply to our input files for a job by calling the setInputFormat() method of the JobConf object that defines the job.

11) Input Split : An InputSplit describes a unit of work that comprises a single map task in a MapReduce program.

12) RecordReader : the InputSplit has defined a slice of work, but does not describe how to access it. The RecordReader class actually loads the data from its source & converts it into (key,value) pairs, suitable for reading by the Mapper. The Recordreader instance is defined by InputFormat. The default InputFormat is TextInputFormat, provides a LineRecordReader, which treats each line of the Input file as a new value. The RecordReader is invoke repeatedly on the input until the entire InputSplit has been consumed. Each invocation of the RecordReader leads to another call to to the map() method of the Mapper.

13) Mapper : Given a key and value the map() method emits (key,value) pairs which are forwarded to the Reducers. The map() method receives 2 parameters in addition to the key & the value.
       a) the OutputCollector object has a method named collect() which will forward a (key,value) pair to the reduce phase of the job.
       b) the Reporter object provides information about the current task; its getInputSplit() method will return an object describing the current InputSplit. The setStatus() method allows you to emit a status message back to the user. The incrCounter() method allows you to increment shared performance counters. Each Mapper can increment the counters & JobTracker will collect the increments made by different processes & aggregate them for later retrieval when the job ends.

14) Partition and Shuffle : the process of moving map outputs to the reducers is known as, 'Shuffling'. The Partitioner class determines which partition a given (key, value) pair will go to.

15) Sort : each reduce task is responsible for reducing the values associated with several intermediate keys. The set of intermediate keys on a single node is automatically sorted to the Reducer.
16) Reducer : a Reducer instance is created for each reduce task. For each key in the partition assigned to a Reducer, the Reducer's reduce() method is called once. The Reducer also receives as parameters 'OutputCollector' and Reporter objects; they are used in the same manner as in the map() method.

17) OutputFormat : the (key,value) pairs provided to this OutputCollector are then written to output files. The instances of OutputFormat provided by Hadoop write to files on the local disk or in HDFS. The output directory is set by the FileOutputFormat.setOutputPath() method. We can control which particular OutputFormat is used by calling the setOutputFormat() method of the JobConf object that defines your MapReduce job.

18) RecordWriter : these are used to write the individual records to the files as directed by OutputFormat.

19) Combiner : it runs after Mapper & before Reducer. Its usage is optional.

                       WordCount.java Program Description

a) Class TokenizerMapper : this class is having methods -
I) map (object key, Text value, Mapper.Content content)
It is called once per each key / value pair in the input split.
II) StringTokenizer class : it is used to split the string

b) Class IntSumReducer : it is having method -
reduce (Key key, Iterable <IntWritable> values, Reducer.Content content)
This method is called once for each key. Most applications will define their reduce class by overriding this method.

c) Class GenericOptionParser : It is a utility to parse command line arguments generic to the Hadoop framework. This class recognizes several standard command line arguments, enabling applications to specify a namenode, a jobtracker, additional configuration resources etc.

d) GenericOptionsParser (Configuration conf, String [] args) : this create a GenericOptionsParser to parse only the generic Hadoop arguments.
Method : getPemainingArgs () : It returns an array of strings containing only application specific arguments.

e) Job Class :
Methods :  
I) FileInputFormat.addInputPath (Job job, Path path) : Add a path to the list of inputs for the map-reduce job.
II) FileOutputFormat.setOutputPath ( Job job, Path outputDir) : set the path of the output directory for the map-reduce job.
III) setMapperClass () : sets the applications mapper class.
IV) setCombinerClass() : set the combiner class for the job.
V) setJarByClass() : set the Jars by finding where a given class came from.
VI) setReducerClass() : set the Reducer for the job.
VII) setOutputKeyClass () : set the key class for the job output data.
VIII) setOutputValueClass() : set the value class for the job outputs.
IX) waitForCompletion() : Submit the job to the cluster and wait for it to finish.

                           Simple Map Reduce Program

map ( string key, String value) :
// key : document name
// value : document contents
for each word x in value :
EmitIntermediate (x, “1”);

reduce (String key, Iterator values) :
// key : a word
// value : a list of counts
int result = 0;
for each v in values :
result t = ParseInt (v);
Emit (AsString (result));

i.e. the map() emits each word plus an associated count of occurrences.
The reduce() sums together all counts emitted for a particular word.
In addition we need to write code to fill a mapreduce specification object with the names of the input and output files, and optional tuning parameters. The user then invokes the “mapreduce()” passing it he specification object. The users code is linked together with MapReduce library.

Inverted Index : The map function parses each document and emits a sequence of (word, document Id) pairs. The reduce function accepts all pairs for a given word, sorts the corresponding document Ids and emits a (word, list (document Id)) pair. The set of all output pairs forms a single inverted index.




                      Designing  HADOOP CLUSTER

                    It Needs 3 Linux Machines to make a cluster. (Even you can make it on 2 machines but 3 are better).
           The root of the distribution is referred to as HADOOP_HOME. All machines in the cluster must have the same HADOOP_HOME path. Export HADOOP_HOME in login script /etc/bashrc to make it persistent.
Hadoop configuration is driven by 2 configuration files in HADOOP_HOME/conf directory. The default configuration settings appear in the read-only 'hadoop-default.xml' file. Node specific configuration settings appear in the 'hadoop-site.xml'.
               Another important file is conf/slaves. On the JobTracker this file lists all the hosts on which the TaskTracker daemon has to be started. On NameNode it lists all the hosts on which the DataNode daemon has to be started. You must maintain this file manually, even if you are scaling uoto a large no. of nodes.

                  Finally conf/hadoop-env.sh contains configuration options such as JAVA_HOME, the location of logs & the directory where process Id's are stored. NameNode & JobTracker on 2 separate nodes & DataNode & TaskTracker on a third node. [ In case of 2 machines you can install NameNode & JobTracker on one machine and DataNode & TaskTracker on second machine].
The conf/slaves file on first 2 nodes contained the Ip address of the 3rd machine(NameNode). All 4 daemons used the same conf/hadoop-site.xml file. Specifically hadoop-site-namenode.xml

Setup PassPhrase SSH :

If you want to use hostnames to refer to the nodes, you must also edit the /etc/hosts file on each node to reflect the proper mapping between the hostnames & Ip addresses.

Hadoop Startup :
To start cluster you need to start both the HDFS & MapReduce.

1) First on NameNode :
Navigate to HADOOP_HOME
2) Format a new Distributed filesystem using,
$ hadoop namenode -format
3) Start HDFS by running command on NameNode,
$ start-all.sh
this script also consult conf/slaves file on NameNode and start the DataNode daemon on all the listed slaves.
4) Start MapReduce with following command on the designated JobTracker.
$ start-mapred.sh
this script also consult the conf/slaves file on the JobTracker & starts the TaskTracker daemon on all the listed slaves.
5) To cross check whether the cluster is running properly, you can look at the process running on each node, using jps, on NameNode you should see the processes Jps, NameNode and if you have only a 3 node cluster, SecondaryNameNode.
6) On JobTracker check for Jps and JobTracker.
7) On TaskTracker / DataNode you should see Jps, DataNode and TaskTracker.

Running MapReduce Jobs:
  
               Once you have Hadoop Cluster running, you can see it in action by executing one of the example MapReduce Java class files bundled in hadoop-0.20.2-examples.jar. As an example take Grep which extracts matching strings from text files and counts how many time they occurred. To begin create an input set for Grep. In this case input will be a set of files in the /conf directory.

# hadoop dfs -copyFromLocal conf input
# hadoop dfs -ls
# hadoop jar hadoop-0.20.2-examples.jar grep input grep-output 'dfs[a-z]+'
# hadoop dfs -get grep-output output
# cd output
# ls


 Some Important Hadoop Commands

# start-all.sh
this command starts all the parameters like jobtracker, namenode, task tracker, datanode etc. on Hadoop cluster

# hadoop dfs -put sample.txt input
this command inserts 'sample.txt' file into HDFS (/user/root/input folder).

# hadoop dfs -rm input/sample.txt
Used to delete sample.txt from HDFS.
To remove all files in input directory recursively give command :
# hadoop dfs -rmr input
To run java program on Hadoop give command :
# hadoop jar hadoop-0.20.2-examples.jar wordcount input/sample.txt /out
the output will be stored at '/user/root/out/part-00000' file or '/user/root/out/part-r-00000' file


No comments:

Post a Comment