How to Handle Files with Custom Delimiters in Apache Spark (Scala)

Consider the scenario or a problem statement of handling Custom Delimiters in Apache Spark using Scala:

You are working as a Big Data maverick in an organisation and you are responsible for putting Apache Spark to use for myriad of use-cases. The company has introduced a new source system (it can be a CRM application or any application server) that generates outputs in the form of text files on a daily basis. You are required to develop Apache Spark application (preferably in Scala if you really want to juice out the maximum performance out of Apache Spark) that can ingest and subsequently process these text files. Sounds easy? Here’s a catch: The text files generated by the newly deployed source system has some weird delimiters (character(s) which separate a record into fields aka field delimiters and also defines where a record starts and ends aka record delimiter). Typically you were so used to working with comma delimited files (aka csv files) which looked like this:

1,halo,xbox
2,the last of us,ps4
3,rayman legends,ps4

where , was the field delimiter and the new line character (also represented typically as \n) as the record delimiter (thus each record appeared in a new line).

Being a Spark evangelist, you had a number of Spark APIs that you used to employ e.g Spark Context’s textFile method which were fit for the purpose. but now the output generated by the source systems are something like this:

1;;halo;;xbox+2;;the last of us;;ps4+3;;rayman legends;;ps4

where the field delimiter is ;; and record delimiter is +. How will you load such file in Apache Spark to form RDDs and subsequently process them (either using key value pair RDDs or Spark SQL Data frames)?

Solution of handling custom delimiters in text files in Apache Spark:

Firstly, in such scenarios, you cant use spark context’s textFile method because there are some nuances to it:

  1. Spark Context’s textFile method is a wrapper for hadoopAPI or newAPIHadoopFile which at the back end makes use Hadoop’s TextInputFormat (which actually extends FileInputFormat class that defines logical splits on the basis of HDFS blocks). If you dig deeper in TextInputFormat, you will discover that this format makes use of RecordReader (which determines how to form key value pair for Hadoop’s mapper from a provided split) which actually parses the split in such a way that it generates the following type of Key Value pair: Key is the byte offset from the beginning of the file and value is each line. Or in simple words, when you use sc.textFile method, it uses TextInputFormat class at the backend which actually treats each new line as a new record and also generates new records whenever it encounters new line character. That’s why when you use sc.textFile to load a file in Spark which looks like the one highlighted above, it generates an RDD of String where each element corresponds to a line.
  2. If you override this default behavior of TextInputFormat or write your own custom InputFormat that treats the custom delimiters, you achieve your desired objective in this context.

To handle such scenarios in Apache Spark where you encounter custom delimiters in text file, there are multiple ways to go about it but I am gonna share the most simplest one:

Using SparkContext to Set TextInputFormat Hadoop Configuration:

There are various places where you can override configurations in Apache Spark which include when you launch a Spark Application, changing the configurations in Apache Spark configuration files or during your program. The one that takes maximum precedence is the one you set in your program. The way to go about it is something like.

Lets say that you have launched a Spark Shell which already exposes a Spark Context object with the identifier sc. Then you can use hadoopConfiguration method and can call its setter method to set the property that you want to override. Here’s how exactly you do it:

scala> sc.hadoopConfiguration.set("textinputformat.record.delimiter","+")

this overrides the TextInputFormat’s default record delimiter i.e. \n to the one you specified. And also it sets this for the life of this spark context or this session. If you launch another application with a new spark context object, it will again default to the \n.

Once set, if you then use textFile method of Apache Spark:

scala> val rddFile=sc.textFile("path_to_weird_file_with_custom_delimiter")

and if you either do a collect (which I discourage but as the data-set is smaller enough to fit in driver’s JVM memory:

you will see that it treated each record properly and generated the record split wherever it encountered a plus.

Handling Custom Field Delimiters in Text Files in Apache Spark:

After you’ve handled the record delimiters, the field delimiters can be handled conveniently if you use map transformation to convert RDD[String] to RDD[Array[String]] i.e. you operate on each element of the RDD[String] and use Scala String’s split method to specify on which character to split a String into Array of Strings as follows:

scala> val rddTransformed=rddFile.map(eachLine=>eachLine.split(";;"))

and if you view some of its elements using take for instance:

scala> rddTransformed.take(3)

Array[Array[String]]=Array(Array(1,halo,xbox),Array(2,the last of us,ps4),Array(3,rayman legends,ps4))

you will see that you have the nice RDD of Array[String] which you can then use for subsequent processing (e.g. to convert to Spark SQL DataFrames or convert to Pair RDDs or whatever)