Simple log analysis with Apache Spark

In this post we will try to learn to run some simple commands with Spark, some simple transformations and actions. We will do some simple log analysis using Spark. I will be using the local Spark cluster that i setup on my laptop. if you haven't read my first post on how to setup an Spark cluster on your local machine i recommend you read the post How to set up a Apache Spark cluster in your local machine. First we will connect the the cluster with the following command.

MASTER=spark://pulasthi-laptop:7077 ./spark-shell

"spark://pulasthi-laptop:7077" is the URL of the master that can be found in the Spark web ui. After connecting successfully you should be able to see an Scala console where you can execute commands. Also you should be able to see your application listed in the web-ui under running applications. To run this scenarios i am using a set of log files generated from various WSO2 products as sample data. Any set of log files or even just a set of text file should suffice for this purpose. I have around 800Mb of log files in a single folder. and i will be running the commands on this data set. 

Spark has a concept called RDD or Resilient Distributed Data-sets. I am not going to go into details about what RDD's are since its a completely separate topic of its own. Simple understand that any transformation done to data in Spark will results in an RDD which can be persisted ( if wanted ) and used. If you want to learn about RDD's in depth there is a great paper on that just check it out Resilient Distributed Datasets.

So first lets create our first RDD from the log files we have. A point to remember is that Spark uses lazy loading hence even though we create an RDD with the log files data will not be pulled into memory until we so some action on the data. We are using the Spark’s interactive Scala shell so all the commands are Scala. A predefined variable "sc" or "SparkContext" is available and you can see the methods that are available with it using the tab

var logs = sc.textFile("/home/pulasthi/work/logs");

The command above will create an RDD with the text files in the path given. you will somthing similar to the follwing printed in the terminal after executing the command.

logs: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at :12

Now lets do a action on the RDD that we just created. Lets just do a simple count 

logs.count

After some time you should get a count of the total number of lines of the logs. on the first run the data will be directly accessed from the file system since Spark uses lazy loading. we will run the same query twice and see what happens. The time spent for the operation will be displayed on the console it self. If you click in the link "Spark shell" on the web ui of the master you will be able to see the progress of each task through the web ui.

From the screen shot above you can see the details of the two runs of the same task as you can see it took 40.7s to complete it on the first run and only 8.6s on the second run. This huge difference is because on the first run data was loaded from the disk and on the second run data was in the RAM.

Note: Much higher performance can be gained when when running on an actual cluster the numbers are a bit high since i am running everything on my laptop.

Now lets filter the logs a bit. lets say we only need to extract some specific data from the log files and we know they are only contained in the areas logged as ERROR in the log files. Simply put we want to analyse some of the errors. So instead of running the query on the logs RDD we will create a errors RDD that only contain the ERROR sections of the log. The benefit of doing this is that for future tasks we will only need the smaller errors RDD to reside in memory. We will execute the following command to create the errors RDD.



 val errors = logs.filter(_.contains("ERROR"))

Again since lazy loading is done the errors RDD will not be created yet. Now i want to extract the lines that have a value called "SequenceAdmin" within the line. The following command will do just that for us.

 val seqad = errors.filter(_.contains("SequenceAdmin"))
Now try out some actions on the newly created RDD "seqad" for example
  • seqad.count - will give you the number of lines "SequenceAdmin" was contained in.
  • seqad.first - will print the first entry in the RDD.
  • seqad.take(10) will give you the first 10 entries.
After the first action the "errors" RDD will be kept in memory unless the its removed to gain space for new RDD's. And subsequent tasks will be performed much faster than the first one. 

Hope this will help someone who is trying to learn spark and is new to spark. hope to write further posts as i work more with Spark.


Popular posts from this blog

How to set up a Apache Spark cluster in your local machine

Writing Unit Tests to test JMS Queue listener code with ActiveMQ

Apache Hadoop MapReduce - Detailed word count example from scratch