How to: Spark to read HDFS files, union DataFrames and store in Parquet format (Scala, Spark SQL)

Lets consider this interesting use-case of Apache Spark:

You are using Apache Spark as your distributed in-memory computation framework. You have a set of source systems that generate the output in the form of text-files (preferably in UTF-8 encoding with proper record delimiter of new-line character). You need a mechanism to read all of the files available in a specific directory , process them and store all of them by converting into Parquet (an optimised columnar format devised by Cloudera which is heavily used in Big Data systems and to serve analytical use-cases). Also, you don’t want to rely on HiveContext (which is also deprecated in Spark 2.X) to achieve this task (which will also require a table to be created in the destination location). You require the files to be in Parquet format because your down-stream applications comprise of Impala (an open source distributed, in-memory SQL on Hadoop computation framework developed by Cloudera) which is being used by Data Analysts in your organization to perform ad-hoc and interactive analytical queries using SQL dialect.

In this context, though at first impression, you may think that there shouldn’t be any challenge because its all about converting from one format to another after doing some processing. However, when you work with Spark, the way to achieve this task is by employing Spark SQL APIs. In Spark SQL, using its higher abstraction/data-structure i.e. DataFrames, one can store the output in a directory and in the desired Parquet format. However, the catch with using Spark SQL API and specifically DataFrame APIs is that the destination directory shouldn’t exist. If it exists, it throws exception. Given this constraint, if you have 10 files, you will have to create 10 directories to store the corresponding Parquet output of those text files which isn’t optimal.

How to use Apache Spark RDD and Spark SQL Dataframes APIs to read a list of files, take their union and store in Parquet Format:

Here’s how you can use Apache Spark, specifically Spark SQL API and DataFrame data structures to store the output of a list of files in a directory. The key is to transform each RDD to Dataframe and take a union of those dataframes and then store the unioned dataframe in the target directory in the Parquet format. One can argue that we can just use textFile function and specify path to a directory in its parameter to load all the files in RDDs and then proceed with them. However, what if you have a specific list of files, that you wanna process in a directory? That’s where the complexity increases and that’s where this solution addresses the issues listed above optimally.

Below is the code in Scala language that uses Apache Spark API and Spark SQL API to do the required task. I haven’t packaged this code in the form of an executable object so you can execute this in Spark Shell to see the output more interactively:

import org.apache.spark.sql.Row;
 import org.apache.spark.sql.types._
 import org.apache.hadoop.fs._
 import sqlContext.implicits._
 import scala.collection.mutable._

//specifying the custom record delimiter for the files:
 sc.hadoopConfiguration.set("textinputformat.record.delimiter","<delimiter>")

//assuming that field delimiter is stored in field_delim variable;

//a helper function get a list of files from the specified HDFS path;
 def get_hdfs_files(given_path:String):ArrayBuffer[String]={
 val fs=FileSystem.get(sc.hadoopConfiguration)
 val flist=fs.listFiles(new Path(given_path),true)
 val arr_buff=new ArrayBuffer[String]
 while (flist.hasNext){
 arr_buff+=flist.next.getPath.toString
 }
 arr_buff
 }

/*a helper function that generates a mutable ArrayBuffer which consists of DataFrames corresponding to each RDD of Files. 
*it accepts an array buffer of String and schema for constructing Dataframes out of those RDDs.
*/
def data_frames_buffer_generator(given_flist:ArrayBuffer[String],given_schema:org.apache.spark.sql.types.StructType):ArrayBuffer[org.apache.spark.sql.DataFrame]={
 val arr_buff=new ArrayBuffer[org.apache.spark.sql.DataFrame]
 for (efile<-given_flist){
 val rdd_x=sc.textFile(efile)
 val df=sqlContext.createDataFrame(rdd_x,given_schema)
 arr_buff+=df
 }
 arr_buff //returning the array buffer whose each element represents dataframe for corresponding RDD.
 }

//specifying schema of the dataframe:
 val my_schema=StructType(List(StructField("id",IntegerType,false),StructField("name",StringType,false),StructField("location",StringType,false)))

//getting list of files in the HDFS directory:
 val hdfs_flist=get_hdfs_files("/user/Cloudera/ie_stuff/spark/exp1")

//generating mutable array buffer of spark sql dataframes from the list of files in the hdfs path:
 val df_buffer=data_frames_buffer_generator(hdfs_flist,my_schema)

//generating empty dataframe for subsequent processing:
 var temp_df=sqlContext.createDataFrame(sc.emptyRDD[Row],my_schema)

//Iterating through each dataframe in the returned mutable ArrayBuffer and generating union of all dataframes in the buffer:
 for (each_df<-df_buffer){
 temp_df=temp_df.unionAll(each_df)
 }

//here temp_df represents a dataframe consisting of Union of RDDs
//writing as Parquet file:
 temp_df.saveAsParquetFiles("/user/Cloudera/ie_stuff/impala/exp2")

 

Here I havent used any processing or transformation on the RDDs and just focused on how to read a list of files in HDFS, convert them into RDDs, convert each of them into DataFrame, how to take union of dataframes and then storing the final dataframe as Parquet files in the target HDFS directory.

Hope that it was insightful. Till next time.