How to use Apache Spark for Business Analytics (Scala, RDDs)

The world of big data is pivoting away from legacy Map Reduce programming model on Hadoop to DAG (directed Acyclic Graph) based in-memory computation framework for their business analytics needs. In this context, though there are a number of players like Storm and Spark, however, Spark stands out from the rest for a number of reasons (some of them include its growing ecosystem of libraries that facilitate relational data processing (SparkSQL), distributed Machine Learning (MLLIB), Graph processing (GraphX)). Though this post isn’t meant to disseminate how Spark works because this itself is such a comprehensive topic and there are so many facets to properly understand how this amazing computation framework operates in scalable manner. However, what this post will convey is a walk-through of how you can use Apache Spark’s Scala RDD API to perform business analytics to derive insights.

When it comes to processing structured data (e.g. delimited files) in Big Data, us Data Scientists have a number of tools to forge to squeeze out insights (e.g. Hive, Impala, Hawq). Specifically in Spark, one can process structured data using its widely famous Spark SQL libraries which provides a tabular like abstraction in the form of DataFrames which works in conjunction with the catalyst optimizer. Spark SQL Dataframes, at the backend, also get converted to RDDs (the basic immutable, partitioned abstraction that Spark provides). However, data may not always be that sweetly structured and conducive for Spark SQL and you may find yourself in position to use Spark RDDs to perform data analytics. That’s where prowess in rather low level Spark RDD APIs comes in handy.

In this post, we are going to work with a data-set that’s available in Cloudera QuickStart VM. The data-set represents a fictitious retail company and has a number of tables like:

  • Customers
  • Orders
  • Order Items
  • Departments
  • Categories
  • Products

Here’s how the data model looks like:

Source: Cloudera https://www.cloudera.com/developers/get-started-with-hadoop-tutorial/exercise-1.html

 

If you have access to Cloudera QuickStart VM, then this data-set is made available in MySQL there. To import that data-set, you can either use Sqoop or Spark SQL.

This is a nice and handy data-sets and we can proceed with how can we derive some interesting insights using Apache Spark. First thing first:

Requirements:

  • You have Apache Spark running (either in local, stand-alone or cluster (whether YARN or MESOS or some other))
  • You have some basic knowledge of Scala

Following the oft-quoted Top Down approach, we will commence our analytics process with a business question and then we will see that whether we have the data assets to perform such analysis.

On which day most orders were processed?

From this question, it can be inferred that its talking mainly about the Orders. If we look into the Orders table (as per the above diagram), then we can see that we have fields like date which can be potentially used to answer this business question.

Loading data from files in Apache Spark RDDs using Scala:

Firstly, you will need to bring data into Spark environment. When you bring external data into Spark, the data gets loaded in the form of RDDs (think of it as a distributed collection like Array but with a lot of cool characteristics like these are immutable and partitioned to multiple worker nodes as per the partitioner function). In our case, the data-set is made available in the form of text file (in which fields are delimited by , and each new line represents new record) thus we can use Spark Context’s famous textFile (which is a convenience function around that uses Hadoop’s TextInputFormat to read data) function to load data into Spark as (assuming that you are in spark-shell where you are already provided spark context with the variable sc):

 
val rdd_1=sc.textFile("/user/ielahi/sample_retail_data/orders/*") 

//specified * at the end to load all files in the folder in this RDD.

 

Courtesy of Spark’s lazy execution model, it hasn’t actually executed anything yet. Rather at the back-end, it has started weaving DAG which will be “played” as soon as you call any Action on this RDD.

e.g. to see how many rows are there in this data-set:

rdd_1.count //count is an action that triggers the execution of DAG and it returns the number of elements (return type: Int) in the RDD to the drive program. In this case, it returned 68883.

Before diving into analysis, its good to do some quick profiling of your data (in fact you should do extensive profiling like nulls and other anomalies check so as to assess how clean your data is and also so as to plan your plan for subsequent data-wrangling. We aren’t covering that in this post). Let us quickly see if there are any duplicate rows in our data-set:

val duplicates=if (rdd_1.count==rdd_1.distinct.count) false else true

the beauty of Scala is that everything is an expression that returns something so in this case, this if statement returns a boolean value “true” if the condition specified is true else it returns “false” and its stored in the variable “duplicates”. Within the condition, we use “count” action along with a transformation “distinct” which generates a new RDD with duplicates removed. We compare the count of both to see if there are any duplicates. In our case, duplicates variable has false value meaning that there aren’t any duplicates in it.

Its always helpful to take a sneak-peek into your data to see how it actually looks. Now one of the most common mistakes that big data folks do is to just issue “collect” action on the RDDs which returns all of the data from all of the worker nodes to the driver program in the form a Scala collection. If your data-set is huge, it can potentially cause huge memory issues because driver’s JVM memory is limited and can hardly store the RDDs data which was distributed in whole of the cluster before. So a safe bet is to use “take” action which returns the number of elements you specified in its argument back to the driver program like:

rdd_1.take(10).foreach(println)

1,2013-07-25 00:00:00.0,11599,CLOSED
 2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT
 3,2013-07-25 00:00:00.0,12111,COMPLETE
 4,2013-07-25 00:00:00.0,8827,CLOSED
 5,2013-07-25 00:00:00.0,11318,COMPLETE
 6,2013-07-25 00:00:00.0,7130,COMPLETE
 7,2013-07-25 00:00:00.0,4530,COMPLETE
 8,2013-07-25 00:00:00.0,2911,PROCESSING
 9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT
 10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT

the “take” action returns results in the form of Scala collection (array) and we then call “foreach” function on the array to operate on each element of the returned array and just display them with println to display elements in a rather clean format.

So we can see that our records are comma-delimited and there are 4 fields (which corresponds with the data model diagram shared above). We can also infer the data-types of the fields from our output (i.e first field seems to be all integers, second field represents timestamp and so on).

Creating RDD of Tuples (and Key Value Pair RDDs) in Apache Spark:

There are different types of RDDs and the ones that are extensively used are pair RDDs i.e. RDDs where each element is a Scala tuple where the first element of the tuple represents key and the second value. Pair RDDs can be forged to implement map reduce programming model in Spark quite optimally instead of using Hadoop’s kinda-obsolete implementation. Also, there are a number of functions which are exclusive to Pair RDDs e.g. Join, reduceByKey, sortByKey, groupByKey.

When you use sc.textFile to load RDDs from external file, what you get is RDD of String. i.e. each element in the RDD is of String type. RDD can store any Java/Scala/Python object (for Scala/Java the object has to be serializable though). So to create RDD of Tuples (or Pair RDDs which are a specific type of it), you need to apply series of transformation to convert RDD of Strings to RDDs of Tuples.

We’ll defer the creation of pure Pair RDDs for now and here we’ll create RDD of Tuples from RDD of Strings so as to keep things simple as for now:

Here’s how it can be done:

val orders_kv=rdd_1.map(x=>x.split(",")).map(x=>(x(0).toInt,x(1),x(2).toInt,x(3)))
now to quickly see what type of RDD you obtained with the previous expression:

orders_kv.take(3)

res6: Array[(Int, String, Int, String)] = Array((1,2013-07-25 00:00:00.0,11599,CLOSED), (2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT), (3,2013-07-25 00:00:00.0,12111,COMPLETE))

so you can see that now the RDDs are of Tuple type (Int,String,Int,String).

Let us quickly see that what is the maximum and minimum value of order ids (i.e. the first column). This will be helpful when there will be a need to join data-set with the order_items tables.

orders_kv.map(x=>x._1).max
 res7: Int = 68883

orders_kv.map(x=>x._1).min
 res8: Int = 1

so using map, we transformed our RDD of tuples to just access the first element of our tuple i.e. id (using Scala’s syntax x._1) and then applied max and min function to get the maximum and minimum value.

Now this brings us to our main question:

How to Sort and Group Data in Apache Spark RDDs so that we get those days with maximum number of order entries?

To do this, we can use this type of transformation:

orders_kv.map(x=>(x._2,1)).reduceByKey((x,y)=>(x+y)).sortBy(x=>x._2,false).take(10).foreach(println)

which gives us output as:

(2013-11-03 00:00:00.0,347)
 (2013-11-24 00:00:00.0,292)
 (2013-10-04 00:00:00.0,287)
 (2013-11-14 00:00:00.0,287)
 (2013-12-26 00:00:00.0,286)
 (2014-07-20 00:00:00.0,285)
 (2014-01-11 00:00:00.0,281)
 (2014-02-01 00:00:00.0,278)
 (2013-11-05 00:00:00.0,278)
 (2013-09-25 00:00:00.0,277)

In this step, we applied a couple of transformations:

  • map to create a tuple of (timestamp,1) thereby making timestamp as “key”
  • then applying reduceByKey to apply the sum aggregation on values of the same key
  • then applying sortBy to sort the results by the value
  • then take(10) to get the 10 elements of the RDDs and then applying foreach on the Scala collection to print each element.

all possible thanks to Scala’s amazing functional programming constructs.

Thus in this extensive blog-post, we set the context and described how to use Apache Spark RDDs in Scala to do data analysis. Hope that was helpful. We can do a lot and I’ll be covering that in coming posts. Cheers and stay connected.

#ExcellenceMatters