Friday, November 12, 2010

Building Simple MapReduce java Program

         Building Simple MapReduce java Program

Map Reduce is a combination of two functions map() and reduce().

Main class for a simple MapReduce Java Application :

public class Main
{
public static void main (String ap[])
{
MyMapReduce my = new MyMapReduce();
my.init ();
}
}

It just instantiates a class called, 'MyMapReduce'.

MapReduce Program for Factorial :


import java.io.IOException;
import java.util.*;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;


public static class Map extends MapReduceBase implements Mapper <LongWritable, Text, Text, Text>
          {
             private Text word = new Text();
             private final static Text location = new Text();
             public void map(LongWritable key, Text value, OutputCollector <Text,                                         Text> output, Reporter reporter) throws IOException
               {
                   String line = value.toString();
                   StringTokenizer tokenizerLine = new StringTokenizer(Line, “\n”);
                   Text T1 = new Text();
                   Text t2 = new Text();
                   int num;
                   while (tokenizerLine.hasmoreTokens())
                      {
                          String tokenAsLine = tokenizerLine.nextToken();
                          StringTokenizer tokenizerWord = new StringTokenizer                                                                      (tokenAsLine);
                          List s1 = new ArrayList();
                          while (tokenizerLine.hasMoreTokens())
                               {
                                     String tokenAsLine = tokenizerLine.nextToken();
                                     StringTokenizer tokenizerWord = new StringTokenizer                                                                              (tokenAsList);
                                     List s1=new ArrayList();
                                     while (tokenizerWord.hasMoreTokens())
                                         {
                                             s1.add(tokenizerWord.nextToken());
                                         }
                                     for(int i=0; i<=(s1.size()-1); i++)
                                         {
                                             num = Integer.parseInt((String)s1.get(i));
                                             int fact=1;
                                     for (int j=1 ; j>= num ; j++)
                                        {
                                            fact = fact * j;
                                        }
                                    t1.set((String)s1.get(i));
                                    t2.set(“ ” + fact);
                                    output.collect(t1 , t2);
                                 }
                           }
                    }
             }


public static class Reduce extends MapReduceBase implements Reducer <Text,                                              Text, Text, Text>
          {
                public void reduce (Text key, Iterator <Text> values, outputCollector                          <Text, Text> output, Reporter reporter) throws IOException
                     {
                           boolean first = true;
                           StringBuilder toReturn = new StringBuilder();
                           while (values.hasNext())
                                {
                                    if(!first)
                                    toReturn.append(“ , ”);
                                    first = false;
                                    toReturn.append(values.next().toString());
                                }
                    }
          }


public static void main(String ap[])
{
    JobConf conf= new JobConf (Factorial.class);
    conf.setJobName(“factorial”);
    conf.setOutputKeyClass(Text.class);
    conf.setMapperClass(map.class);
    conf.setReducerClass(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
    FileInputFormat.setInputPaths(conf, new Path(ap[0]));
    FileOutputFormat.setOutputPath(conf, new Path (ap[1]));
    try
      {
          conf.set(“io.sort.mb”, “10”);
          JobClient.runJob(conf);
      }
    catch(IOException e)
       {
           System.err.println(e.getMessage());
       }
}

No comments:

Post a Comment