How to: Apache Spark to find and filter Dirty Records (Scala/RDD)

Getting valuable insights out of Data Science pipeline is never that straight forward. Data folks usually encounter a number of issues related to data and most of them are related to the quality of data. There may be a lot of dirty records in your data-set i.e. records with anomalous values or values that don’t confirm to your interfaces. The predicament worsens when you are dealing with data-sets of huge scale i.e. million and billion of rows. Finding out which records have issues and filtering them out can be a time, processing and memory intensive task. In such instances, distributed and in-memory computation frameworks like Apache Spark really come in handy. Coupling the capabilities of Apache Spark, its data abstraction (core Spark RDDs or Spark SQL DataFrames) with Scala’s functional programming constructs, you can perform data cleansing using Apache Spark very efficiently. In this post, I am just going to cover one such use-case.

Lets say that we have a data-set of all postcodes of Australia and the corresponding territories which looks something like this:

0800 Darwin (NT)
 0800 Darwin City (NT)
 0800 Darwin GPO (NT)
 0801 Darwin GPO Private Boxes (NT)
 0810 Alawa (NT)
 0810 Brinkin (NT)
 0810 Casuarina (NT)
 0810 Coconut Grove (NT)
 0810 Jingili (NT)
 0810 Lee Point (NT)
 0810 Millner (NT)
 0810 Moil (NT)
 0810 Nakara (NT)
 0810 Nightcliff (NT)
 0810 Rapid Creek (NT)
 0810 Tiwi (NT)
 0810 Wagaman (NT)
 0810 Wanguri (NT)

P.S: you can access such data-set at this link

Lets also assume that you want to use this data-set in your data science pipeline in some context (e.g. look-ups by joining with some other data-set with more demographic details). You want to ensure that the postal codes are correct and that there aren’t any faulty or dirty entries in it. In this case, the number of records, though, wont be that huge, however even in this instance, going through record by record and seeing if the postal codes are “okay” will be time consuming. Natural choice would be to write a script that checks it. However if that program is executed in a runtime environment which is localised in one machine, you may run into memory issues. So putting Apache Spark to use where the processing will be distributed across multiple executors in YARN containers provisioned in multiples worker nodes makes quite a lot of sense.

How to use Scala APIs for Apache Spark for handling dirty records:

This problem can be approached in so many ways. We can formulate regular expression and check the records if the pattern conforms. However, I am going to present another simple yet effective way to check if there are any issues in our data-set i.e. whether the postal codes are all correct. The logic will be based on the premise that the postal codes in Australia are 4 digits. So if any of the postal code is greater than 4 digits, its candidate to be faulty. The data-set available at the link is all clean however for demonstration purposes, I’ve tempered the data myself to make the tutorial more intuitive.

so assuming that we have this data-set available in Hadoop distributed file system (HDFS). Then lets load this data-set in Apache Spark to create RDD of String:

scala> val oz_postcodes_rdd = sc.textFile("/user/ielahi/sample_ds/australia_post_codes.txt")

Doing a quick check of how many records are there in this data-set:

scala> oz_postcodes_rdd.count
 res15: Long = 13582

Let us see what is the maximum length of the Postal Code entries in our data-set. It should be 4.

scala> oz_postcodes_rdd.map(x=>x.split("\t")(0)).keyBy(x=>x.length).map(x=>x._1).max
 res16: Int = 5

The result that we got was 5 which signals that there are definitely some faulty entries. We applied a map transformation (which applies a higher order function on each element of the RDD). We firstly split each element of our RDD (i.e. string containing each line in our file) which returns Array and we retrieve the first element of the array (as done with x=>x.split(“,”)(0)) which corresponds to the postal codes. Then we apply keyBy transformation to generate key value pair RDDs of the form (length, postal codes) where key represents length (i.e. length of string) of the element and the value represents the element itself (e.g. (4,0810)). And then through another map transformation, we retrieve just the first element of the tuple and then call “max” action to find out the maximum value.

Lets see how many faulty records we have in our data-sets:

scala> oz_postcodes_rdd.map(x=>x.split("\t")(0)).keyBy(x=>x.length).filter(x=>x._1.toInt>4).count
 res17: Long = 2

meaning we have 2 faulty records.

The next natural question would be to see which records are faulty and we can do that as follows:
 scala> oz_postcodes_rdd.map(x=>x.split("\t")(0)).keyBy(x=>x.length).filter(x=>x._1.toInt>4).collect
 res18: Array[(Int, String)] = Array((5,27759), (5,35012))

thus it shows the records which aren’t correct. Having identified such dirty records, we can discard these records and proceed with our analysis.

So you can see that using Scala language and Apache Spark RDD APIs, we can quickly investigate which records are wrong in our data-set which turns out to be quite helpful in our data science workflow.

Hope that was helpful. See ya in next post.