Hadoop Streaming API on Amazon EMR

I’m writing this post with the hope to help people to better understand how  the hadoop streaming API works, according to my experience and findings on EMR.

 

Intro

During the last weeks I have been working on a MR python script which I ran on a Amazon’s hadoop cluster (EMR or Elastic Map-Reduce service).

AWS (Amazon Web Services) allow you to use a simple web interface to predict, create and launch your MR jobs. Despite this, I preferred to SSH into the master node and launch the job using the CLI. The AWS interface allows you to define the number and type  of machines running in the cluster, the bootstrap options and define some steps. So, this is what you may wanna use in the future, but for streaming map reduce jobs the arguments you can pass are slightly different: -files and -archives (not sure, not tested) have been replaced by something like “-cache-files” and “-cache-archives”. But please, read the Amazon docs if you wanna use it to launch a streaming MR job.

 

Terminology

Before going forward, I would like to introduce you some very basic concepts that may be unknown to people approaching this world for the first time. You will soon read things like:

Bootstrap options : the scripts or JARs that  run when the machines in the cluster are booted. They run just after the boot. A streaming MR job may need additional software on the machine. This is the script that should install this additional software on each node in the cluster. You can do that using the EMR web interface, where you can create a “Custom action” and point it to  S3://mybucket/setup.sh  You have to create the setup.sh bash script by yourself. That’s a simple script which may install stuff on the nodes using the following commands: apt-get install XXXX and/or pip install XXX to make some dependencies available to each node.

Steps: An EMR cluster can run all the steps you want when you need (as long as the cluster is up and running! :D). You are not forced to create them when you raise a cluster. A step always run a JAR. Streaming MR job usually runs hadoop-streaming.jar which comes with hadoop. The arguments that we will soon see in this post are the ones used by the official hadoop streaming jar.

EMR & EC2: in the AWS web console, under the “Services” drop-down menu, you can find “Elastic Map Reduce” and “EC2”. If you don’t know what is the relation between them.. you’ll understand it quite soon. From the “EMR” console, you can basically launch your cluster, add steps and check the status of the cluster itself. From the EC2 management console, you have a detailed view of the status of each node in the cluster (eg: CPU/Memory/IO usage and network info).

 

 

Launchig a MR Job from the console

To define a streaming  map-reduce job (step), we have to SSH into the master (The IP is visible in the EMR console) and launch the following command, assuming that all the files we need are already in S3.


hadoop jar /home/hadoop/contrib/streaming/hadoop-streaming.jar \ -files s3://aloisio-dev/something.sh#something-symlink.sh -archives s3://aloisio-dev/something.tar.gz#somethingUncompressedDIR -mapper “/bin/sh -e setup-wrapper.sh python mrj.py –step-num=0 –mapper” \ -reducer “/bin/sh -e setup-wrapper.sh python mrj.py –step-num=0 –reducer” \ -output s3://aloisio-dev/out \ -input s3://aloisio-dev/in

Here is the description of each argument:

“-files” copies some files into the cache. In this case, “-files” copies “something.sh” from S3 into the hadoop cache AND creates a symlink “something-symlink.sh” in the working directory pointing to the actual file in the cache “something.sh”.  Multiple files can be passed as argument as comma separated list.

“-archives” copies one or more archives (JAR or TGZ) from S3, uncompresses them in the cache and creates a symlink (in this example called:  somethingUncompressedDIR) in the working dir. This is useful if our job needs some other python files that are our own package. A python package is usually a dir containing a __init__.py and some other files.

“-mapper” and “-reducer” define the command to run as mapper and reducer.  “/bin/sh -e setup-wrapper.sh ” defines PYTHONPATH and this is necessary if your code is in separate package.

“-input” and “-output” must be S3 or HDFS locations. You can browse S3 using the Web interface and HDFS using the command “hadoop fs -ls /”. Please note that HDFS is erased when the cluster is terminated.

In more detail, you can see here the hadoop cache for a map reduce job. “Archives” and “files” are the dirs containing the files downloaded from S3 (if any).  Job.jar is the JAR which will be run by Hadoop.

hadoop@ip:~/XX$ hadoop fs -ls /mnt/var/lib/hadoop/tmp/mapred/staging/hadoop/.staging/job_201403180914_0006/
Found 6 items
drwx—— – hadoop supergroup 0 2014-03-18 11:37 /mnt/var/lib/hadoop/tmp/mapred/staging/hadoop/.staging/job_201403180914_0006/archives
drwx—— – hadoop supergroup 0 2014-03-18 11:37 /mnt/var/lib/hadoop/tmp/mapred/staging/hadoop/.staging/job_201403180914_0006/files
-rw-r–r– 10 hadoop supergroup 111670 2014-03-18 11:37 /mnt/var/lib/hadoop/tmp/mapred/staging/hadoop/.staging/job_201403180914_0006/job.jar
-rw-r–r– 10 hadoop supergroup 227911 2014-03-18 11:37 /mnt/var/lib/hadoop/tmp/mapred/staging/hadoop/.staging/job_201403180914_0006/job.split
-rw-r–r– 1 hadoop supergroup 47146 2014-03-18 11:37 /mnt/var/lib/hadoop/tmp/mapred/staging/hadoop/.staging/job_201403180914_0006/job.splitmetainfo
-rw-r–r– 1 hadoop supergroup 26364 2014-03-18 11:37 /mnt/var/lib/hadoop/tmp/mapred/staging/hadoop/.staging/job_201403180914_0006/job.xml

The symlink files in the working directory will point to the files and dir in “files” and “archives”.

Running a Python MR Job and setting PYTHONPATH

This is a python specific.. sorrry! 😀 Feel free to skip it if not interested in the topic.

If you you have a python package in an archive, it will be available to your python code ONLY if you define a PYTHONPATH environment variable. PYTHONPATH can be defined in two ways:

  •  Using “-cmdenv PYTHONPATH=$PWD/somethingUncompressedDIR
  • adding export PYTHONPATH=$PWD/somethingUncompressedDIR to a setup-wrapper.sh script that you can run as shown previously.

 

Monitoring a MR Jo

There are few options here. You can see just the output of the job you launched, which will look as a normal hadoop command output, or you can use the hadoop trackers  http://nodeip:9100/jobdetails.jsp?jobid=job_123451234123_0000.

If you are interested in knowing the status of the Machines in order to find bottlenecks, you may wanna have a look at the graphs in “EMR” or “EC2” web consoles.

 

Logs and troubleshooting

Before running a mrjob on the cluster you may wanna run it locally. The mrjob python framework, for instance, allows you to do that quite easily. Even if everything runs perfectly on your local machine, that is not sufficient to let the process run smoothly on a different configuration as a cluster of machines could be. You may notice issues due to wrong inputs, missing dependencies and so on..

To read the logs you can use the mrjob python framework or use the more generic way; just run the lynx browser in the console with the following command:

lynx http://node_ip:9103/logs

Then you can go in attempts dir and read your logs.

 

Conclusion

This post is far from being exhaustive in describing the topic, but hopefully it can give some pointers to the person willing to run a streaming map reduce job on EMR.  I’m not the most expert person in the world, so I’m open to feedback! Thanks for reading and have fun!! 😀

 

References

  1. http://hadoop.apache.org/docs/r1.2.1/streaming.html

Leave a Reply

Your email address will not be published. Required fields are marked *