Friday, April 18, 2014

Simple log analysis with Apache Spark Java API

In this post we will try to redo the sample that we did in my previous post Simple log analysis with Apache Spark, using the Spark JAVA api and since i am more accustomed to maven we will create a simple maven project to accomplish this task. For this post i will not use a separate Spark cluster, we will just use an embedded Spark that will automatically startup a server when running the code.

First lets create a new maven project. I am using Intellij Idea. I will name my project LogAnalizer. The first thing you would need to do is add the maven dependencies to your pom file. To do this add the following lines into the pom.xml file in your newly created Maven project. You will also need to add the repository.



<repositories>
        <repository>
            <id>Akka repository</id>
            <url>http://repo.akka.io/releases</url>
        </repository>
</repositories>         
<dependencies>
        <dependency> <!-- Spark dependency -->
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>0.9.1</version>
        </dependency>
</dependencies>

Now lets take a look at the Java class, This where all the magic happens. since this a very basic example i will explain the main parts of the code. What the code will do is read a set of log files that i have collected into one folder and first it will filter and create an RDD that only contains the "ERROR" sections of the log and then we will further filter the error logs and find the errors that contain a specific string in this case that String would be "SequenceAdmin". 

Below is the code 


package org.pulasthi.spark;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.util.List;

public class LogAnalizer {
    private static String logFolderPath = "/home/pulasthi/work/logs";

    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext("local[4]", "Log Analizer", "/home/pulasthi/work/spark/spark-0.9.1/", new String[]{"target/logAnalizer-1.0-SNAPSHOT.jar"});
        JavaRDD logs = sc.textFile(logFolderPath);

        JavaRDD errors = logs.filter(new Function() {
            @Override
            public Boolean call(String s) throws Exception {
                return (s.contains("ERROR")) ? true : false;
            }
        });

        JavaRDD sequenceAdmin = errors.filter(new Function() {
            @Override
            public Boolean call(String s) throws Exception {
                return (s.contains("SequenceAdmin")) ? true : false;
            }
        });

        List list = sequenceAdmin".take(10);
        for (String s:list) {
            System.out.println(s);

        }
    }

}

First we need to initialize a Spark Context this is done in the first line. to get a better understanding of the parameters sent to the constructor take a look at the official documentation - Spark Programming Guide. Its in Scala but its practically same as JAVA hence you don't need to know Scala to understand the documentation. In this code we have done two filters to create the "errors" RDD and the "sequenceAdmin" RDD. And at last we have printed the first 10 results that are in the "sequenceAdmin" RDD. You can also take a look at the JAVA documentation - Java Programming Guide.

pulasthi

Amazon Deals