Tuesday, November 5, 2013

Simple log analysis with Apache Spark

In this post we will try to learn to run some simple commands with Spark, some simple transformations and actions. We will do some simple log analysis using Spark. I will be using the local Spark cluster that i setup on my laptop. if you haven't read my first post on how to setup an Spark cluster on your local machine i recommend you read the post How to set up a Apache Spark cluster in your local machine. First we will connect the the cluster with the following command.

MASTER=spark://pulasthi-laptop:7077 ./spark-shell

"spark://pulasthi-laptop:7077" is the URL of the master that can be found in the Spark web ui. After connecting successfully you should be able to see an Scala console where you can execute commands. Also you should be able to see your application listed in the web-ui under running applications. To run this scenarios i am using a set of log files generated from various WSO2 products as sample data. Any set of log files or even just a set of text file should suffice for this purpose. I have around 800Mb of log files in a single folder. and i will be running the commands on this data set. 

Spark has a concept called RDD or Resilient Distributed Data-sets. I am not going to go into details about what RDD's are since its a completely separate topic of its own. Simple understand that any transformation done to data in Spark will results in an RDD which can be persisted ( if wanted ) and used. If you want to learn about RDD's in depth there is a great paper on that just check it out Resilient Distributed Datasets.

So first lets create our first RDD from the log files we have. A point to remember is that Spark uses lazy loading hence even though we create an RDD with the log files data will not be pulled into memory until we so some action on the data. We are using the Spark’s interactive Scala shell so all the commands are Scala. A predefined variable "sc" or "SparkContext" is available and you can see the methods that are available with it using the tab

var logs = sc.textFile("/home/pulasthi/work/logs");

The command above will create an RDD with the text files in the path given. you will somthing similar to the follwing printed in the terminal after executing the command.

logs: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :12

Now lets do a action on the RDD that we just created. Lets just do a simple count 


After some time you should get a count of the total number of lines of the logs. on the first run the data will be directly accessed from the file system since Spark uses lazy loading. we will run the same query twice and see what happens. The time spent for the operation will be displayed on the console it self. If you click in the link "Spark shell" on the web ui of the master you will be able to see the progress of each task through the web ui.

From the screen shot above you can see the details of the two runs of the same task as you can see it took 40.7s to complete it on the first run and only 8.6s on the second run. This huge difference is because on the first run data was loaded from the disk and on the second run data was in the RAM.

Note: Much higher performance can be gained when when running on an actual cluster the numbers are a bit high since i am running everything on my laptop.

Now lets filter the logs a bit. lets say we only need to extract some specific data from the log files and we know they are only contained in the areas logged as ERROR in the log files. Simply put we want to analyse some of the errors. So instead of running the query on the logs RDD we will create a errors RDD that only contain the ERROR sections of the log. The benefit of doing this is that for future tasks we will only need the smaller errors RDD to reside in memory. We will execute the following command to create the errors RDD.

 val errors = logs.filter(_.contains("ERROR"))

Again since lazy loading is done the errors RDD will not be created yet. Now i want to extract the lines that have a value called "SequenceAdmin" within the line. The following command will do just that for us.

 val seqad = errors.filter(_.contains("SequenceAdmin"))
Now try out some actions on the newly created RDD "seqad" for example
  • seqad.count - will give you the number of lines "SequenceAdmin" was contained in.
  • seqad.first - will print the first entry in the RDD.
  • seqad.take(10) will give you the first 10 entries.
After the first action the "errors" RDD will be kept in memory unless the its removed to gain space for new RDD's. And subsequent tasks will be performed much faster than the first one. 

Hope this will help someone who is trying to learn spark and is new to spark. hope to write further posts as i work more with Spark.

Saturday, November 2, 2013

How to set up a Apache Spark cluster in your local machine

The past few days i grew some interest in Apache Spark and thought of playing around with it a little bit. If you haven't heard about it go an take a look its a pretty cool project it claims to be around 40x faster than Hadoop in some situation. The incredible increase in performance is gained by leveraging in-memory computing technologies. I want go into details about Apache Spark here if you want to get a better look at Spark just check out there web site - Apache Spark.

In this post we will be going through the steps to setup an Apache Spark cluster on your local machine. we will setup one master node and two worker nodes. If you are completely new to Spark i recommend you to go through First Steps with Spark - Screencast #1 it will get you started with spark and tell you how to install Scala and other stuff you need.

We will be using the  launch scripts that are provided by Spark to make our lives more easier. First of all there are a couple of configurations we need to set.


When using the launch scripts this file is used to identify the host-names of the machine that the slave nodes will be running. All you have to do is provide the host names of the machines one per line. since we are setting up everything in our machine we will only need to add "localhost" to this file.


There are a set of variables that you can set to override the default values. this can be done by putting in values in the "spark-env.sh" file. There is a template available "conf/spark-env.sh.template" you can use this template to create the spark-env.sh file. Several variable that can be added is mentioned in the template is self. we will add the following lines to the file.

export SCALA_HOME=/home/pulasthi/work/spark/scala-2.9.3
export SPARK_WORKER_DIR=/home/pulasthi/work/sparkdata

Here SPARK_WORKER_MEMORY specifies the amount of memory you want to allocate for a worker node if this value is not given the default value is the total memory available - 1G. Since we are running everything in our local machine we woundt want the slave the use up all our memory. I am running on a machine with 8GB of ram and since we are creating 2 slave node we will give each of the 2GB of ram.

The SPARK_WORKER_INSTANCES specified the number of instances here its given as 2 since we will only create 2 slave nodes.

The SPARK_WORKER_DIR will be the location that the run applications will run and which will include both logs and scratch space. Make sure that the directory can be written to by the application that is permission are set properly.

After we have these configurations ready we are good to go. now lets start by running the master node.
Just execute the launch script for the master that is "start-master.sh"


Once the master is started you should be able to access the web ui at http://localhost:8080.

Now you can proceed to start the slaves. This can be done by running the "start-slaves.sh" launch script.

Note: In order to start slaves the master need to be able to access the slave machines through ssh. since we are running on the same machine that is your machine should be accessible through ssh. make sure you have ssh installed run "which sshd". if you don't have it installed install it with the following command.

 sudo apt-get install openssh-server

You will also need to specify an password for the root since this will be requested when running the slaves. If you do not have a root password set use the following command to set an password.

sudo passwd

With the slaves successfully started now you have a Spark cluster up and running. If everything went according to plan the web-ui for the master should show the two slave nodes.

Now lets connect to the cluster from the interactive shell by executing the following command

MASTER=spark://IP:PORT ./spark-shell
You can find the IP and the PORT in the top left corner of the web ui for the master. When successfully connected the web ui will show that there is an active task.

Hope to write more posts regarding Spark in the future. if you want to learn a bit more about Spark there is some great documentations on the Spark site is self here. Go an check it out.


Amazon Deals