Skip to main content

Pyspark : Read File to RDD and convert to Data Frame

Through this blog, I am trying to explain different ways of creating RDDs from reading files and then creating Data Frames out of RDDs.


This blog is for :

pyspark (spark with Python) Analysts and all those who are interested in learning pyspark.

Pre-requesties:

Should have a good knowledge in python as well as should have a basic knowledge of pyspark


RDD(Resilient Distributed Datasets):

It is an immutable distributed collection of objects. This is the fundamental data structure of spark.By Default when you will read from a file using
sparkContext, its converted in to an RDD with each lines as elements of type string.But this lacks of an organised structure

Data Frames : 

This is created actually for higher-level abstraction by imposing a structure to the above distributed collection.Its having rows and columns
(almost similar to pandas).from  spark 2.3.x, Data frames and data sets are more popular and has been used more that RDDs.
Learn in more detail here : 
https://data-flair.training/blogs/apache-spark-rdd-vs-dataframe-vs-dataset/


Now lets start with File To RDD conversions:



FILE TO RDD conversions:

1. A file stored in HDFS file system can be converted into an RDD using SparkContext itself.Since sparkContext can read the file directly from HDFS,
it will convert the contents  directly in to a spark RDD (Resilient Distributed Data Set)
in a spark CLI, sparkContext is imported as sc
Example: Reading from a text file
textRDD = sc.textFile("HDFS_path_to_text_file")

2. A file stored in local File system can not be read by sparkContext directly. So we need to read it using core python APIs as list and then need to convert 
it in to an RDD using sparkContext

example:

with open("local_path_to_file") as file:
file_list=file.read().splitlines() #this will convert each line of the file in to an element of                                                            list.Here file_list have each line of the file as string
fileRDD = sc.parallelize(file_list) # This will convert the list in to an RDD where each element is of type string



RDD to DF conversions:

RDD is nothing but a distributed collection.By Default when you will read from a file to an RDD, each line  will be an element of type string.

DF (Data frame) is a structured representation of RDD. 

To convert an RDD of type tring to a DF,we need to either convert the type of RDD elements in to a tuple,list,dict or Row type

As an Example, lets say a file orders containing 4 columns of data ('order_id','order_date','customer_id','status')  in which each column is delimited by Commas.

And Let us assume, the file has been read using sparkContext in to an RDD (using one of the methods mentioned above) and RDD name is 'ordersRDD'

Now let us convert the RDD in to DF:

There are 4 ways:


RDD to DF using tuples:

#Here we are passing column names as a list

ordersTuple=ordersRDD.map(lambda o: (int(o.split(",")[0]),o.split(",")[1],int(o.split(",")[2]),o.split(",")[3])) 

ordersTuple.toDF(['order_id','order_date','customer_id','status'])

DD to DF using Row:

from pyspark.sql import Row;

method1:

#Here we are passing column names as a list

ordersRow=ordersRDD.map(lambda o: Row(int(o.split(",")[0]),o.split(",")[1],int(o.split(",")[2]),o.split(",")[3]))

ordersRow.toDF(['order_id','order_date','customer_id','status'])



method2:

#Here we are passing column names at the time of mapping itslef, a kind of similar to dict

ordersRow=ordersRDD.map(lambda o: Row(order_id=int(o.split(",")[0]),order_date=o.split(",")[1],customer_id=int(o.split(",")[2]),status=o.split(",")[3]))

ordersRow.toDF()

RDD to DF using List:

#Here we are passing column names as a list

ordersList=ordersRDD.map(lambda o: [int(o.split(",")[0]),o.split(",")[1],int(o.split(",")[2]),o.split(",")[3]])

ordersList.toDF(['order_id','order_date','customer_id','status'])

RDD to DF using dictionary (This is depricated and the similar method is using Row type. Even though still we can use it (verified in spark 2.3.1)):

#Here we are passing column names at the time of mapping itself

ordersDict=ordersRDD.map(lambda o: {'order_no':int(o.split(",")[0]),'order_date':o.split(",")[1],'customer_id':int(o.split(",")[2]),'status':o.split(",")[3]})

ordersDict.toDF()

Comments

Post a Comment

Popular posts from this blog

Understanding spark architecture in Deep with YARN

OVERVIEW Apache spark is a Distributed Computing framework. By distributed it doesn’t imply that it can run only on a cluster. Spark can be configured on our local system also. But Since spark works great in clusters and in real time , it is being implemented in multi node clusters like Hadoop, we will consider a Hadoop cluster for explaining spark here. We can Execute spark on a spark cluster in following ways Interactive clients(scala shell,pyspark etc): Usually used for exploration while coding like python shell Submit a job (using spark submit utility):Always used for submitting a production application Basic Architecture Spark follows a Master/Slave Architecture. That is For every submitted application, it creates a Master Process and multiple slave processes. Master is the Driver and Slaves are the executors. Say If from a client machine, we have submitted a spark job to a cluster. Spark will create a driver process and multiple executors. Similraly  if

Different ways of Transposing a Dataframe in Pyspark

When I have started coding on transposing Dataframes, I found below different methods.  I am sharing all those info here. Creation of a test Input Dataframe to be Transposed ds = {'one':[0.3, 1.2, 1.3, 1.5, 1.4, 1.0],'two':[0.6, 1.2, 1.7, 1.5, 1.4, 2.0]} df = sc.parallelize([ (k,) + tuple(v[0:]) for k,v in ds.items()]).toDF() df.show() method 1 (This method involves conversion of spark object to a python object (rdd to list of tuples of entire data)): inp_list=df.rdd.map(tuple).collect() # Creating a list of tuples of rows # Unpacking the list and zipping all tuples together except the header #list(zip(*inp_list))[1:] is having the transposed tuples  and list(zip(*inp_list))[0] is having the header df_transpose=spark.createDataFrame(list(zip(*inp_list))[1:],list(zip(*inp_list))[0])  df_transpose.show() method 2 (In this method only header data we are converting into python list. Rest of the transformations are carried out as