Friday, November 18, 2016

Shell script for Tree structured copying to copy large data files to large number of nodes with scp

Sometimes you need to copy a large file to number of remote hosts. I recently had a similar situation where i had to copy a 56GB data file to around 30 compute nodes in an HPC cluster. And i did not have the option to copy it to the shared disk (since it was pretty filled up). So i had to copy the file to the private scratch area of each node. Having the data in the private scratch area is better for the application since you get better read performance ( at least in the system i was working on).

So copying to each node from my machine or from the head node would take a very long time. because of network bandwidth limitations. So i came up with a small shell script that would do the copy in a tree like structure. How the script goes is that once it is provided with the set of nodes and the data file and destination. first it will copy the data to the first node in the file say node1. Then it will start copying from both the headnode and node1 to node2 and node3 respectively. likewise in the next step it will copy the file from all 4 nodes (head node and 3 compute nodes) to 4 more nodes. And this will go in until the file is copied to all the nodes in the list.

Inputs :

1 : txt file with nodes listed with one node per line

2 : data file to be copied

3 : destination to be copied to.

For this code you need to run the script from the folder you have the data file from. But changing that would be a simple improvement ( which i am to lazy to do at the moment )


#!/bin/bash

# input nodes file, file to be copied, destination of file
# the command should be executed from the location the file is (needs to be improved)
counter=0
for line in `cat $1`;do
        nodes[$counter]=$line;
        counter=$((counter+1))
done
counter2=0
counter3=0
while (( $counter2 < $counter ))
do
        #scp $2 ${nodes[ $counter2 ]}:$3 &
        scp $2 ${nodes[ $counter2 ]}:$3 &
        if (( $counter2 > 0 ))
        then
                counter3=0
                while (( $counter3 < $counter2 ))
                do
                        if (( $((counter2 + counter3 + 1)) < $counter ))
                        then
                                scp ${nodes[ $counter3 ]}:$3/$2 ${nodes[ $((counter2 + counter3 + 1)) ]}:$3 &
                                counter3=$((counter3+1))
                        else
                                break;
                        fi
                done
                counter2=$((counter2+counter3))
        fi
        wait
        counter2=$((counter2+1))
done
wait

Hope this help someone who is having the same issue

Saturday, June 18, 2016

Setting up Heron Cluster with Apache Aurora Locally

In this post we will be looking at how we can setup Heron steam processing engine in Apache Aurora in our local machine. Oh Boy this is going to be a long post :D. I am doing this on Ubuntu 14.04 and these steps should be similar to any Linux machine. Heron supports deployment in Apache Aurora out of the box. Apache Aurora will act as the Scheduler for Heron after the setup is complete. In order to do this first you will have to setup Apache Zookeeper and allow Heron to communicate with it. Here Apache Zookeeper will act as the State Manager of the Heron deployment. if you just want to setup a local cluster without the hassle of  installing aurora take a look at my previous blog post - Getting started with Heron stream processing engine in Ubuntu 14.04

Setting Up Apache Aurora Cluster locally 

First thing we need to do is to setup Apache Aurora locally. I will try to explain as much of the configurations as i can as we go on. First lets get a Apache Aurora cluster running on our local machine. The steps are extracted from here.

Step 1: Install VirtualBox and Vagrant

Download and install VirtualBox and Vagrant in your machine. If vagrant is successfully installed in your machine  the following command should list several common commands for this tool
$ vagrant
Step 2: Clone the Aurora repository

You can get the source repository for Aurora with the following command
$ dgit clone git://git.apache.org/aurora.git 
Once the clone is complete cd into the aurora folder

$ cd aurora

Step 3: Starting Local Aurora Cluster

To start the local cluster all you have to do is execute the following command. It will install all the needed dependencies like Apache Mesos and Zookeeper in the VM. 
$ vagrant up
Additionally to get rid of some of the warning messages that you get during up command execute the following command 
$ vagrant plugin install vagrant-vbguest
You can verify that the Aurora cluster is properly running by opening the following links in your web-browser 
If you go into http://192.168.33.7:8081/scheduler you can notice that the name of the default cluster that is setup in aurora is named "devcluster" this will be important to note when submitting typologies from heron.

Installing Heron within VM

Now that we have the Aurora cluster setup we need to install heron within the cluster VM in order to be able to create get our Heron deployment working. Since this is a fresh VM instance you will have to install the basic software such as "unzip" and set the JAVA_HOME path as an environmental variable ( Just need to add this to .bashrc file). After you have the basic stuff working follow the following steps to install Heron in the VM. You can ssh into the VM with the following command

$ vagrant ssh

Step 1.a : Download installation script files

You can download the script files that match to Ubuntu from https://github.com/twitter/heron/releases/tag/0.14.0

For the 0.14.0 release the files you need to download will be the following.

heron-client-install-0.14.0-ubuntu.sh
heron-tools-install-0.14.0-ubuntu.sh

Optionally - You want need the following for the steps in the blog post

heron-api-install-0.14.0-ubuntu.sh
heron-core-0.14.0-ubuntu.tar.gz

Step 1.b: Execute the client and tools shell scripts

$ chmod +x heron-client-install-VERSION-PLATFORM.sh
$ ./heron-client-install-VERSION-PLATFORM.sh --user
Heron client installer
----------------------

Uncompressing......
Heron is now installed!

Make sure you have "/home/vagrant/bin" in your path.

After this you need to add the path "/home/vagrant/bin". You can just execute the following command or add it to the end of  .bashrc file ( which is more convenient ).
$ export PATH=$PATH:/home/vagrant/bin
Install the following packages to make sure that you have all the needed dependencies in the VM. You might have to do sudo apt-get update before you execute the following.
$ sudo apt-get install git build-essential automake cmake libtool zip libunwind-setjmp0-dev zlib1g-dev unzip pkg-config -y

Configuring State Manager ( Apache Zookeeper )

Since Heron only uses Apache Zookeeper for coordination the load on the Zookeeper node is minimum. Because of this it is sufficient to use a single Zookeeper node or if you have an Zookeeper instance running for some other task you can simply use that. Since Apache Aurora already uses an Zookeeper instance we can directly use that instance to execute State Manager tasks of Heron. First you need to configure Heron to work with the Zookeeper instance. You can find meanings of each attribute here. Configurations for State manager are located in the directory /home/vagrant/.heron/conf/aurora

Open the file statemgr.yaml using vim ( or some other text editor you prefer ) and add/edit the file to include the following. 

# local state manager class for managing state in a persistent fashion
heron.class.state.manager: com.twitter.heron.statemgr.zookeeper.curator.CuratorStateManager

# local state manager connection string
heron.statemgr.connection.string:  "127.0.0.1:2181"

# path of the root address to store the state in a local file system
heron.statemgr.root.path: "/heronroot"

# create the zookeeper nodes, if they do not exist
heron.statemgr.zookeeper.is.initialize.tree: True

# timeout in ms to wait before considering zookeeper session is dead
heron.statemgr.zookeeper.session.timeout.ms: 30000

# timeout in ms to wait before considering zookeeper connection is dead
heron.statemgr.zookeeper.connection.timeout.ms: 30000

# timeout in ms to wait before considering zookeeper connection is dead
heron.statemgr.zookeeper.retry.count: 10

# duration of time to wait until the next retry
heron.statemgr.zookeeper.retry.interval.ms: 10000

Creating Paths in Zookeeper

Next we need to create some paths within Zookeeper since some of the paths are not created by Heron automatically. So we need to create them manually. Since Aurora installation already installed Zookeeper we can use the Zookeeper cli to create the manual paths.
$ sudo ./usr/share/zookeeper/bin/zkCli.sh
This will connect to the Zookeeper instance running locally. Then execute the following commands from within the client to create paths /heronroot/topologies and /heron/topologies. Later in "Associating new Aurora cluster into Heron UI"  we will see that we only need to create /heronroot/topologies but for now lets create both to make sure we don't get any errors when we run things.

create /heronroot null
create /heronroot/topologies null
create /heron null
create /heron/topologies null

Configuring Scheduler ( Apache Aurora )

Next we need to configure Apache Aurora to be used as the Scheduler for our Heron local cluster.
In order to do this we need to edit the scheduler.yaml file that is also located in /home/vagrant/.heron/conf/aurora. Add/Edit the file to include the following. More information regarding parameters can be found here

# scheduler class for distributing the topology for execution
heron.class.scheduler: com.twitter.heron.scheduler.aurora.AuroraScheduler

# launcher class for submitting and launching the topology
heron.class.launcher: com.twitter.heron.scheduler.aurora.AuroraLauncher

# location of the core package
heron.package.core.uri: file:///home/vagrant/.heron/dist/heron-core.tar.gz

# location of java - pick it up from shell environment
heron.directory.sandbox.java.home: /usr/lib/jvm/java-1.8.0-openjdk-amd64/

# Invoke the IScheduler as a library directly
heron.scheduler.is.service: False

Additionally edit the client.yaml file and change the core uri to make it consistant.
# location of the core package
heron.package.core.uri: file:///home/vagrant/.heron/dist/heron-core.tar.gz

Important Step: Change folder name aurora to devcluster

Next we need to change the folder name of /home/vagrant/.heron/conf/aurora to /home/vagrant/.heron/conf/devcluster. This is because the name of our aurora cluster is devcluster as we noted in a previous step. You can do this with the following commands
$ cd /home/vagrant/.heron/conf/
$ mv aurora devcluster
Now we are almost done :).

Submitting Example Topology to Aurora cluster

Now we can submit a topology to the aurora cluster. this can be done with the following command.
$ heron submit devcluster/pulasthi/devel --config-path ~/.heron/conf/ ~/.heron/examples/heron-examples.jar com.twitter.heron.examples.ExclamationTopology ExclamationTopology

Now you should be able to see the topology in the Aurora UI ( http://192.168.33.7:8081/scheduler/pulasthi ) .


Understanding the parameters 

I will try to explain some of the important parameters that are used in this command. the first parameter "devcluster/pulasthi/devel" defines cluster, role and env ( env can have values prod | devel | test | staging ). The cluster is the name of the aurora cluster which is devcluster in our case. You can give something like your name for the role name and for env you need to choose from one of the env values. 

--config-path points to the config folder. the program will automatically look for a folder with the cluster name. This is why we had to change the name of the aurora conf folder to devcluster.

Now that everything is working we need to perform one last step to be able to see the typologies that we can see in Aurora UI in Heron UI.

Associating new Aurora cluster into Heron UI.

Heron UI uses information that is gets from the heron tracker when displaying the information in the heron UI interface. So in-order to allow the Heron UI to show Aurora cluster information we need to modify configuration of the Heron tracker so that it can identify the Aurora Cluster. 

Heron Tracker configurations are located at /home/vagrant/.herontools/conf the configuration file is named heron_tracker.yaml. By default you should see the following in the file

statemgrs:
  -
    type: "file"
    name: "local"
    rootpath: "~/.herondata/repository/state/local"
    tunnelhost: "localhost"
  -
    type: "zookeeper"
    name: "localzk"
    hostport: "localhost:2181"
    rootpath: "/heron"
    tunnelhost: "localhost"

 You can see that there already two entries. Before we had to create paths in Zookeeper for /heron/topologiesthis is because the entry named localzk in this file. If we remove this we will not need to create that path in Zookeeper. Not all we have to is to add a new entry for the aurora cluster into this file ( we will comment out localzk ). Then the file would look like below.

statemgrs:
  -
    type: "file"
    name: "local"
    rootpath: "~/.herondata/repository/state/local"
    tunnelhost: "localhost"
  #-
   #type: "zookeeper"
   # name: "localzk"
   # hostport: "localhost:2181"
   # rootpath: "/heron"
   # tunnelhost: "localhost"
  -
    type: "zookeeper"
    name: "devcluster"
    hostport: "localhost:2181"
    rootpath: "/heronroot"
    tunnelhost: "localhost"

Now you can start Heron tracker and then Heron UI, Now you will be able to see the aurora cluster from the Heron UI ( http://192.168.33.7:8889/topologies ) as below
$ heron-tracker
$ heron-ui



Now you should have everything working. I hope this post was helpful. And leave a comment and share your thoughts and if you notice any mistakes in the post please let me know. I will make sure to correct the post.


Monday, June 13, 2016

Getting started with Heron stream processing engine in Ubuntu 14.04

I was trying to get started with Heron which is a stream processing engine from twitter and faced some problems when trying to do the initial setup on Ubuntu. I am using Ubuntu 14.04 so not these problems might not happen in other Ubuntu versions. The steps below are simply following the steps in the Heron documentation. But since i am working on Ubuntu we will only show the steps for Ubuntu.

Step 1.a : Download installation script files


You can download the script files that match to Ubuntu from https://github.com/twitter/heron/releases/tag/0.14.0

For the 0.14.0 release the files you need to download will be the following.

heron-client-install-0.14.0-ubuntu.sh
heron-tools-install-0.14.0-ubuntu.sh

Optionally - You want need the following for the steps in the blog post

heron-api-install-0.14.0-ubuntu.sh
heron-core-0.14.0-ubuntu.tar.gz

Step 1.b: Execute the client and tools shell scripts


$ chmod +x heron-client-install-VERSION-PLATFORM.sh
$ ./heron-client-install-VERSION-PLATFORM.sh --user
Heron client installer
----------------------

Uncompressing......
Heron is now installed!

Make sure you have "/usr/local/bin" in your path.

After this you need to add the path "/usr/local/bin" which would look something line "/home/username/bin" for you. You can just execute the following command or add it to the end of  .bashrc file ( which is more convenient ).

$ export PATH=$PATH:/usr/local/bin

Step 2 — Launch an example topology

$ heron submit local ~/.heron/examples/heron-examples.jar com.twitter.heron.examples.ExclamationTopology ExclamationTopology

This command will submit the example topology "ExclamationTopology" to your local running cluster.

Step 3 — Start Heron Tracker and Step 4 — Start Heron UI

In this step the commands will start the Heron tracker and Heron UI.

$ heron-tracker
... Running on port: 8888
... Using config file: /Users/USERNAME/.herontools/conf/heron_tracker.yaml

$ heron-ui
... Running on port: 8889
... Using tracker url: http://localhost:8888
Now you can access the Heron from http://localhost:8889

Step 5 — Explore topology management commands

This is the step that i ran into some problems when trying out for the first time in Ubuntu 14.04 LTS. This step shows you how to activate deactivate and kill topologies. The following commands are used to do so.

$ heron activate local ExclamationTopology
$ heron deactivate local ExclamationTopology
$ heron kill local ExclamationTopology

But when i tried to execute these commands  i got the following error.

java.nio.file.NoSuchFileException: ~/.herondata/repository/state/local/pplans/ExclamationTopology

was able to figure out after some googling that this is due to some missing packages in Ubuntu. So installing the following packages did the trick for me.

$ sudo apt-get install git build-essential automake cmake libtool zip libunwind-setjmp0-dev zlib1g-dev unzip pkg-config -y

I hope this helps anyone who comes across the same issue.

Tuesday, June 7, 2016

Apache Hadoop MapReduce - Detailed word count example from scratch

In this post we will look at how to create and run a word count program in Apache Hadoop. In order to make it easy for a beginner we will cover most of the setup steps as well. Please note that this blog entry is for Linux based environment. I am running Ubuntu 14.04 LTS on my machine. For windows users steps might be a little different, information regarding running Hadoop on Windows is available at  Build and Install Hadoop 2.x or newer on Windows.

Prerequisites

1. Need to have Java installed (preferabally a newer java version such as 1.7 or 1.8 )

Download Oracle JDK 8 from http://www.oracle.com/technetwork/java/javase/downloads/index.html Extract the archive to a folder named jdk1.8.0 Set the following environment variables. (You can set the variables in the .bashrc file)
JAVA_HOME=
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH

2. SSH, If you do not have ssh installed in your machine use the following command to install ssh and rsync which is also needed

  $ sudo apt-get install ssh
  $ sudo apt-get install rsync 

3. Download and extract latest Hadoop binary into your machine. The latest hadoop binary files are available at http://hadoop.apache.org/releases.html. The following commands will download and extract Hadoop version 2.7.2.

wget http://www-eu.apache.org/dist/hadoop/common/hadoop-2.7.2/hadoop-2.7.2.tar.gz
tar -xzvf hadoop-2.7.2.tar.gz

Word Count
Image reference - http://blog.enablecloud.com/2012/06/what-lies-at-core-of-hadoop.html



Word Count is a simple and easy to understand algorithm which can be implemented as a mapreduce application easily. Given a set of text documents the program counts the number of occurrences of each word. The algorithm consists of three main sections.


  1. Main Program
  2. Mapper
  3. Reducer
Writing the Mapper Class

The WordCountMapper class is created by extending the Mapper class and the map function is implemented by overriding the map method in the Mapper class. The mapper functions takes a key-value pair as an input and outputs a key-values pair as an output ( The output is given through the context object ). The key value pair that the map function takes as an input and the key value pair that is given as an output need not be of the same type.

For instance in the WordCountMapper the input to the map method is a key-value pair where the key is the line number and the value is the line of text in the corresponding line (line_number, line_text). And outputs (word,1) for each word it reads in the line.

The pseudo code for the map function is below

void map(file, text) {
     foreach word in text.split() {
            output(word, 1);
     }
    }


The actual java code for the map function is below

public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }

Writing the Reducer Class

The WordCountReducer class is created by extending the org.apache.hadoop.mapreduce.Reducer class and the reduce method is implemented by overriding the reduce method from the Reducer class. The reduce function collects all the intermediate key-value pairs  generated by the multiple map functions and will sum up all the occurrences of each word and output a key-value pair for each word in the text documents as . The detailed implementation of the WordCountReducer is below

The pseudo code for the reducer function is below 

 void reduce(word, list(count)) {
        output(word, sum(count));
    }

The actual java code for the reducer function is below.


public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                context.write(word, one);
            }
        }
    }


Writing Main Method

The main method sets up all necessary configurations and runs the mapreduce job.

    1. Job Name : name of this Job
    2. Executable (Jar) Class: the main executable class. For here, WordCount.
    3. Mapper Class: class which overrides the "map" function. For here, WordCountMapper.
    4. Reducer: class which override the "reduce" function. For here , WordCountReducer.
    5. Output Key: type of output key. For here, Text.
    6. Output Value: type of output value. For here, IntWritable.
    7. File Input Path
    8. File Output Path


public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(WordCountMapper.class);
        job.setCombinerClass(WordCountReducer.class);
        job.setReducerClass(WordCountReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

The complete code for the word Count example is below.


    import java.io.IOException;
    import java.util.StringTokenizer;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    public class WordCount {

        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf, "word count");
            job.setJarByClass(WordCount.class);
            job.setMapperClass(WordCountMapper.class);
            job.setCombinerClass(WordCountReducer.class);
            job.setReducerClass(WordCountReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }

        public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> {

            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();

            public void map(Object key, Text value, Context context) throws IOException,
                    InterruptedException {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens()) {
                    word.set(itr.nextToken());
                    context.write(word, one);
                }
            }
        }

        public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            private IntWritable result = new IntWritable();

            public void reduce(Text key, Iterable<IntWritable> values, Context context)
                    throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable val : values) {
                    sum += val.get();
                }
                result.set(sum);
                context.write(key, result);
            }
        }

    }



Compiling the Code

In order to compile the code we need to create a .java file that contains our code. execute the following commands and paste the code listed above into the file created by the vim command


   cd ~/software/hadoop-2.7.2
    vim WordCount.java

They are several ways to generate an jar file from WordCount.java. The following is a very simple and straightforward method that can be used.  Move to the Hadoop directory and execute the following commands. You need to have WordCount.java file also under the Hadoop directory

    cd ~/software/hadoop-2.7.2
    export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
    ./bin/hadoop com.sun.tools.javac.Main WordCount.java
    jar cf wc.jar WordCount*.class

This will create a jar file that contains the compiled classes needed to run the program on Hadoop.

If you are familiar with MVN another more cleaner method will be to create a MVN project for the WordCount example and simply do a “mvn clean install” which will produce a jar file. You will need to add the following dependency in the pom.xml

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>1.2.1</version>
    </dependency>


Running the Code.

Next we will run the example on a local standalone Hadoop node. Before we run the example we need to create a set of input files that will be given to the program.

First create a directory to put all the input files in. The program will read all the files that are in this folder. Use the following commands to create the files and directories

    mkdir -p ~/hadoop/wordcount/input
    cd ~/hadoop/wordcount/input
    vim 1.txt
    vim 2.txt

Create two text files 1.txt and 2.txt under the folder containing the following

1.txt - Hello World Bye World

2.txt - Hello Hadoop Goodbye Hadoop

To run the mapreduce job execute the following command from the hadoop directory


  cd ~/software/hadoop-2.7.2
  ./bin/hadoop jar wc.jar WordCount ~/hadoop/wordcount/input ~/hadoop/wordcount/output

After the job has completed execute the following command and check the output that was generated.

   cat ~/hadoop/wordcount/output/part-r-00000
    Bye 1
    Goodbye 1
    Hadoop 2
    Hello 2
    World 2

I hope that you were able to better understand how Hadoop MapReduce works from this blog post. Let me know if you think there are improvements that i can make to this blog. Or if i got something wrong.

pulasthi

Amazon Deals