CSC 5003 – Semantic Web And Big Data Architecture

Portail informatique

CI2 : Hadoop And MapReduce

Manipulate the Hadoop File System and write MapReduce tasks.

Install Hadoop (∼20mn – easy)

We will start by install a local version of Hadoop to practice on it. To do so, you need a Unix system and Java must be installed. Then, execute the following lines. Note that your JAVA_HOME may differ.
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 wget https://dlcdn.apache.org/hadoop/common/stable/hadoop-3.3.4.tar.gz tar -xvf hadoop-3.3.4.tar.gz cd hadoop-3.3.4 mkdir hdfs

During all the lab, we will work from the directory hadoop-3.3.4.
You can check your installation works by running bin/hadoop (the bin/ could be ignored if you redefine your PATH variable correctly. If you do not do it, you will have to write bin/hadoop instead of just hadoop as we saw in the lecture). Here, as we have the local version of Hadoop, it shares the OS filesystem. However, the Hadoop commands work the same.
We created a directory hdfs in which we will store all the files we need. Using only Hadoop commands, create two directories, input and output. The path must start with hdfs. Check that you see the two new directories (still with Hadoop commands).
From now, you must never use Unix commands to access files in the hdfs directory. You are only allow to use Hadoop commands
bin/hadoop fs -mkdir hdfs/input bin/hadoop fs -mkdir hdfs/output bin/hadoop fs -ls hdfs

Follow this link and download the book Peter Pan. Send this file on the HDFS, in the input directory.
bin/hadoop fs -copyFromLocal ~/Downloads/peter_pan.txt hdfs/input

Building a MapReduce (∼30mn – medium)

The goal of this exercice is to compile a Hadoop program and build a JAR file that we will submit as a Hadoop job.

Open your IDE and create a new Scala project. Then, add the following JAR files to your project (these files are in the directory we created before, hadoop-3.3.6):
  • share/hadoop/common/hadoop-common-3.3.6.jar
  • share/hadoop/hdfs/hadoop-hdfs-3.3.6.jar
  • share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.3.6.jar
  • the entire directory share/hadoop/common/lib/
The easiest way is to first copy them in a new directory and then include them. This step depends on the IDE you are using. If you are not sure, Google the solution.
For IntelliJ, you have to go to File, Project Structure, Modules, Dependencies, +, Jars or Directories... Then select all jars individually.
You can also use SBT to include automatically the dependencies. In IntelliJ, you have to select Scala and SBT when you create a new project.
Once the project is created, you will find a file called build.sbt. Add the following lines: libraryDependencies ++= Seq( "org.apache.hadoop" % "hadoop-common" % "3.3.6", "org.apache.hadoop" % "hadoop-hdfs" % "3.3.6", "org.apache.hadoop" % "hadoop-mapreduce-client-core" % "3.3.6" ) After you made the changes, you should see a button on your code called Load sbt Changes (Ctrl + Shift + O). Click on it to make the changes effective.

Create a new Scala object called FirstExample and copy-paste the following code:
import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} object FirstExample { def main(args: Array[String]): Unit = { val uri = new URI("file:////PATH_TO_HADOOP/hdfs") val fs = FileSystem.get(uri, new Configuration()) val filePath = new Path("file:////PATH_TO_HADOOP/hdfs/input") val status = fs.listStatus(filePath) status.map(sts => sts.getPath).foreach(println) } }
If you did the previous question correctly, you should not have errors and you should be able to run the code directly in your IDE (if you are using SBT, do not forget to press Load sbt Changes (Ctrl + Shift + O)). It should print the file containing Peter Pan.

We will create our first Map-Reduce task that counts words in a text. Create a new Scala object called WordCount and copy-paste the following code:
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, TextInputFormat} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer} import java.lang import scala.jdk.CollectionConverters.IterableHasAsScala object WordCount { class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] { val one = new IntWritable(1) override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context): Unit = { val words = value.toString.split(" ") words.foreach(x => context.write(new Text(x), one)) } } class SumReducer extends Reducer[Text, IntWritable, Text, IntWritable] { override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, Text, IntWritable]#Context): Unit = { val sum = values.asScala.foldLeft(0)(_ + _.get) context.write(key, new IntWritable(sum)) } } def main(args: Array[String]): Unit = { val configuration = new Configuration val job = Job.getInstance(configuration,"word count") job.setJarByClass(this.getClass) job.setMapperClass(classOf[TokenizerMapper]) job.setCombinerClass(classOf[SumReducer]) job.setReducerClass(classOf[SumReducer]) job.setInputFormatClass(classOf[TextInputFormat]) job.setOutputFormatClass(classOf[TextOutputFormat[Text, IntWritable]]) job.setOutputKeyClass(classOf[Text]) job.setOutputValueClass(classOf[IntWritable]); FileInputFormat.addInputPath(job, new Path(args(0))) FileOutputFormat.setOutputPath(job, new Path(args(1))) System.exit(if(job.waitForCompletion(true)) 0 else 1) } }
We will make sense of this code in the next question. For now, we want to build our project to create a JAR file. As this step depends on your IDE, we cannot give instructions on how to do it. Search on Google how to build a JAR. The main class must be WordCount.
If you are using SBT, first specify the main class in the build.sbt by adding Compile / packageBin / mainClass := Some("WordCount"). Then, in your SBT console, run the command package (you might need to reload the configuration with reload). Your jar should appear in a directory called target/scala-2.13 if you are using IntelliJ.
Once you have your JAR file, you can submit your job to Hadoop by running the following command:
bin/hadoop jar PATH_TO_YOUR_JAR/YOU_JAR.jar file:///PATH_TO_HADOOP/hdfs/input/peter_pan.txt file:///PATH_TO_HADOOP/hdfs/output/peter_pan_wc
If this command does not work and throws a NoClassDefFoundError, try running the following commands (in your Hadoop directory, and adapt your Scala version by using the one in the build.sbt): wget https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.13.12/scala-library-2.13.12.jar mv scala-library-2.13.12.jar share/hadoop/common Using a Hadoop command, copy the resulting files on your local disk. What do you observe about peter_pan_wc?
peter_pan_wc is a directory that contains the result of the computations.

Let's now have a closer look at the code. How are the input and output types of the reducer specified? Note that Hadoop uses its own types.
The mapper and reducer extend generic classes that takes four type parameters: the input key type, the input value type, the output key type and the output value type. Then, one must extends the map or the reducer?

What is the operation equivalent to the emit we saw in the lecture?
context.write(key, value)

Where is the mapper and reduced assigned to the job?
In the main, using job.setMapperClass and job.setReducerClass.

Building MapReduce Jobs(∼40mn – medium)

In this exercice, we will follow the structure from the MapReduce job of the previous exercice to write new MapReduce tasks.

Write a MapReduce job that associates each word with the most frequent word that follows it. Depending on your implementation, can you use a combiner?
You might need to simplify the text by turning it to lowercase (.toLower) and by removing non alphanumeric characters (.replaceAll("[^A-Za-z0-9 ]", "")).
The most frequent word after peter should be pan. To turn an Text object into a String, you need to use the method toString.
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer} import java.lang import scala.jdk.CollectionConverters.IterableHasAsScala object MostFrequentFollower { class TokenizerMapper extends Mapper[Object, Text, Text, Text] { override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, Text]#Context): Unit = { val words = value.toString.map(_.toLower).replaceAll("[^A-Za-z0-9 ]", "").split(" ") for (i <- Range(0, words.length - 1)){ context.write(new Text(words(i)), new Text(words(i+1))) } } } class MostFrequentReducer extends Reducer[Text, Text, Text, Text] { override def reduce(key: Text, values: lang.Iterable[Text], context: Reducer[Text, Text, Text, Text]#Context): Unit = { val maxi = values.asScala.map(x => x.toString).toList.groupBy(x=>x).maxBy(_._2.size) context.write(key, new Text(maxi._1)) } } def main(args: Array[String]): Unit = { val configuration = new Configuration val job = Job.getInstance(configuration,"most frequent follower") job.setJarByClass(this.getClass) job.setMapperClass(classOf[TokenizerMapper]) job.setReducerClass(classOf[MostFrequentReducer]) job.setOutputKeyClass(classOf[Text]) job.setOutputValueClass(classOf[Text]); FileInputFormat.addInputPath(job, new Path(args(0))) FileOutputFormat.setOutputPath(job, new Path(args(1))) System.exit(if(job.waitForCompletion(true)) 0 else 1) } }

How many different words does this book contains?
  • If you want two MapReduce jobs, we can create a second job in the main function and run it after the first one. The configuration will be similar to the first one.
  • To set a key and value type different for the Mapper, you can use .setMapOutputKeyClass and setMapOutputValueClass. Be careful! Having a combiner can be a problem in this case.
  • The second job does not take a text as input format, but rather a key/value pair list. You can use the class KeyValueTextInputFormat to read it correctly.
  • You will have to save the results of the first job in an intermediate file that you can pass as argument
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{IntWritable, Text} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, KeyValueTextInputFormat, TextInputFormat} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} import org.apache.hadoop.mapreduce.{Job, Mapper, Reducer} import java.lang import scala.jdk.CollectionConverters.IterableHasAsScala object NumberUniqueWords { class TokenizerMapper extends Mapper[Object, Text, Text, IntWritable] { val one = new IntWritable(1) override def map(key: Object, value: Text, context: Mapper[Object, Text, Text, IntWritable]#Context): Unit = { val words = value.toString.map(_.toLower).replaceAll("[^A-Za-z0-9 ]", "").split(" ") words.foreach(x => context.write(new Text(x), one)) } } class OneReducer extends Reducer[Text, IntWritable, IntWritable, IntWritable] { val one = new IntWritable(1) override def reduce(key: Text, values: lang.Iterable[IntWritable], context: Reducer[Text, IntWritable, IntWritable, IntWritable]#Context): Unit = { context.write(one, one) } } class IdentityMapper extends Mapper[Object, Text, IntWritable, IntWritable] { val one = new IntWritable(1) override def map(key: Object, value: Text, context: Mapper[Object, Text, IntWritable, IntWritable]#Context): Unit = { context.write(new IntWritable(key.toString.toInt), new IntWritable(value.toString.toInt)) } } class SumReducer extends Reducer[IntWritable, IntWritable, IntWritable, IntWritable] { val one = new IntWritable(1) override def reduce(key: IntWritable, values: lang.Iterable[IntWritable], context: Reducer[IntWritable, IntWritable, IntWritable, IntWritable]#Context): Unit = { context.write(one, new IntWritable(values.asScala.foldLeft(0)(_ + _.get))) } } def main(args: Array[String]): Unit = { val configuration = new Configuration val job1 = Job.getInstance(configuration,"number unique words job 1") job1.setJarByClass(this.getClass) job1.setMapperClass(classOf[TokenizerMapper]) job1.setReducerClass(classOf[OneReducer]) job1.setInputFormatClass(classOf[TextInputFormat]) job1.setOutputFormatClass(classOf[TextOutputFormat[IntWritable, IntWritable]]) job1.setMapOutputKeyClass(classOf[Text]) job1.setMapOutputValueClass(classOf[IntWritable]) job1.setOutputKeyClass(classOf[IntWritable]) job1.setOutputValueClass(classOf[IntWritable]); FileInputFormat.addInputPath(job1, new Path(args(0))) FileOutputFormat.setOutputPath(job1, new Path(args(1))) job1.waitForCompletion(true) val job2 = Job.getInstance(configuration, "number unique words job 2") job2.setJarByClass(this.getClass) job2.setMapperClass(classOf[IdentityMapper]) job2.setReducerClass(classOf[SumReducer]) job2.setInputFormatClass(classOf[KeyValueTextInputFormat]) job2.setOutputFormatClass(classOf[TextOutputFormat[IntWritable, IntWritable]]) job2.setOutputKeyClass(classOf[IntWritable]) job2.setOutputValueClass(classOf[IntWritable]); FileInputFormat.addInputPath(job2, new Path(args(1))) FileOutputFormat.setOutputPath(job2, new Path(args(2))) System.exit(if (job2.waitForCompletion(true)) 0 else 1) } }