How to use Apache Spark for Business Analytics (Scala/RDDs) – Part 2

In previous post, we started off by exploring how can we make use of Apache Spark for business analytics. We answered a couple of business analytics questions using the Apache Spark’s RDD APIs in Scala. In this post aka tutorial, we are going to further explore how we can make use of Apache Spark’s powerful RDD APIs in Scala to answer some more interesting questions to generate insight.

Lets start off by considering the following business question:

Which customer has the most orders with status as complete?

and see if we can answer that from the data-sets that we have in place. Referring to the diagram of the data-model again as follows:

Source: https://www.cloudera.com/content/dam/www/static/images/screenshots/tutorial/exercise1-f1.png

Focusing specifically on “orders” table, we can see that we have customer_id and order_status fields that we can potentially use to answer this question. So lets load that data-set in Spark and creating RDD of Tuples as:

val orders_kv=sc.textFile("/user/ielahi/sample_ds/retail_data/orders/*").map(x=>x.split(",")).map(x=>(x(0),x(1),x(2),x(3)))

and just to get an insight again of how the data looks like:

scala> orders_kv.take(3)
 res6: Array[(Int, String, Int, String)] = Array((1,2013-07-25 00:00:00.0,11599,CLOSED), (2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT), (3,2013-07-25 00:00:00.0,12111,COMPLETE))

 

So our elements of interest in this RDD of Tuples is the 3rd and 4th one.

Our business question can be answered by applying the following series of transformations and actions:

scala> orders_kv.filter(x=>x._4=="COMPLETE").map(x=>(x._3,1)).reduceByKey(_+_).sortBy(x=>x._2,false).take(30).foreach(println)

Upon executing we get the following output:
 (9337,10)
 (749,9)
 (3710,9)
 (7802,9)
 (11061,8)
 (2469,8)
 (221,8)
 (5283,8)
 (5186,8)
 (7910,8)
 (5033,7)
 (12051,7)
 (7177,7)
 (3303,7)
 (9325,7)
 (7981,7)
 (4849,7)
 (8861,7)
 (173,7)
 (363,7)
 (3979,7)
 (3763,7)
 (4568,7)
 (10876,7)
 (8314,7)
 (4116,7)
 (5004,7)
 (12242,7)
 (6484,7)
 (5600,7)

 

meaning that the customer with customer id 9337 has the highest number of orders with status as complete.

Lets revisit how we used Apache Spark’s RDD APIs in Scala to answer this business question:

  • We firstly applied RDD’s filter transformation to filter just those elements of RDD which have order_status field is COMPLETE. As we have RDD of tuples of the form (Int, String, Int, String) and order_status is the 4th element in this RDD thus we accessed the fourth element of the tuple using the notation x._4 and used a conditional statement as x._4==”COMPLETE”. Thus only those records which satisfy this condition were returned as a result of this transformation.
  • Once done, we applied a map transformation to form a Key-Value RDD (which actually is RDD of Tuples (String, Int)) where we make customer_id i.e. the 3rd element of the Tuple (x._3) as key and 1 as value. At this point, each element of the RDD will look something like this (customer_id,1) e.g. (9337,1),(5600,1),(6484,1),(9337,1) and so on.
  • This transformation is then instrumental in applying a reduceByKey transformation where we apply a commutative and associative function of summation on values of the same key. Thus, for all Key-Value pairs with same key, the value will be added. As we had 1 as value, so it would act as a counter for us.
  • Then we apply sortBy transformation so that we may sort the result. We specify to sort the result on the second element of the Key-Value or Tuple RDD i.e. sort based on value and not on key. Additionally, we pass false as well to sort in descending order.
  • And we call “take” action to return the top 10 elements of our Key-Value pair RDDs and then we call foreach function on the returned Scala collection to print each element on the screen.

Lets answer the second question:

From which states did the retail company got most orders?

To answer this question, we will have to load the customers data-set which contains fields like states. So:

scala> val customers=sc.textFile("/user/ielahi/sample_ds/retail_data/customers/*")
customers: org.apache.spark.rdd.RDD[String] = /user/Cloudera/sample_ds/retail_db/customers/* MapPartitionsRDD[145] at textFile at <console>:31

as a best practice, lets quickly take a glance of how our data looks like:

scala> customers.take(3)

res57: Array[String] = Array(1,Richard,Hernandez,XXXXXXXXX,XXXXXXXXX,6303 Heather Plaza,Brownsville,TX,78521, 2,Mary,Barrett,XXXXXXXXX,XXXXXXXXX,9526 Noble Embers Ridge,Littleton,CO,80126, 3,Ann,Smith,XXXXXXXXX,XXXXXXXXX,3422 Blue Pioneer Bend,Caguas,PR,00725)

 

Lets convert this RDD of Strings to RDD of tuples so that we may conveniently manipulate them for our processing:

 

scala> val customers_kv=customers.map(x=>x.split(",")).map(x=>(x(0).toInt,x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8)))
customers_kv: org.apache.spark.rdd.RDD[(Int, String, String, String, String, String, String, String, String)] = MapPartitionsRDD[147] at map at <console>:33

thus to answer our question, we can make use similar approach as follows:

scala> customers_kv.map(x=>(x._8,1)).reduceByKey(_+_).sortBy(x=>x._2,false).take(5).foreach(println)
 (PR,4771)
 (CA,2012)
 (NY,775)
 (TX,635)
 (IL,523)

from this its established that the company got maximum orders from PR state followed by CA, NY, TX and IL.

Thus this post highlights how you can use certain transformation and actions in Apache Spark using Scala language to answer certain types of questions in business analytics.