Skip to main content

Different ways of Transposing a Dataframe in Pyspark

When I have started coding on transposing Dataframes, I found below different methods.  I am sharing all those info here.



Creation of a test Input Dataframe to be Transposed

ds = {'one':[0.3, 1.2, 1.3, 1.5, 1.4, 1.0],'two':[0.6, 1.2, 1.7, 1.5, 1.4, 2.0]}
df = sc.parallelize([ (k,) + tuple(v[0:]) for k,v in ds.items()]).toDF()
df.show()




method 1 (This method involves conversion of spark object to a python object (rdd to list of tuples of entire data)):

inp_list=df.rdd.map(tuple).collect() # Creating a list of tuples of rows

# Unpacking the list and zipping all tuples together except the header
#list(zip(*inp_list))[1:] is having the transposed tuples  and list(zip(*inp_list))[0] is having the header

df_transpose=spark.createDataFrame(list(zip(*inp_list))[1:],list(zip(*inp_list))[0]) 
df_transpose.show()

method 2 (In this method only header data we are converting into python list. Rest of the transformations are carried out as spark transformations):

groupedRDD=df.rdd.map(tuple).flatMap(lambda x : (x[0],x[1:])) # grouping header data and row data seperately

#Collecting the header names to a list
#Those elements of groupedRDD with data type as 'str' are header names, i.e, type(x)==str

header=groupedRDD.filter(lambda x : type(x)==str).collect() 

#Those elements of groupedRDD with data type as tuple are row data ,i.e, type(x)==tuple.
#We are grouping all current rows under one key  and zipping the tuples together to form transposed tuple. 'groupByKey().mapValues(lambda x : tuple(zip(*tuple(x))))',this part denotes the same.
#Using flatMap(lambda x : x[1]), we are excluding the key value and getting the tuples of transposed rows alone 

df_T=groupedRDD.filter(lambda x : type(x)==tuple).map(lambda x : (1,x)).groupByKey().mapValues(lambda x : tuple(zip(*tuple(x)))).flatMap(lambda x : x[1]).toDF(header)

method 3 (Here we are going via pure rdd approach and not storing any kind of data in form of python objects):

#Here we are grouping the new header and transposed tuples and then converting them in to a dictionary.And this RDD of dictioanries , we are converting them in to dataframe

df_T=df.rdd.map(tuple).flatMap(lambda x : [(y,(x[0],z)) for y,z in enumerate(x[1:])]).groupByKey().mapValues(tuple).map(lambda x : dict(x[1])).toDF()

method 4 (This is as same as method 3 except instead of using depricated dictionary to DF conversion, we are using Row object to DF conversion):

#Here we are grouping the new header and transposed tuples and then converting them in to Row 
Object.And this RDD of dictioanries , we are converting them in to dataframe

df_T=df.rdd.map(tuple).flatMap(lambda x : [(y,(x[0],z)) for y,z in enumerate(x[1:])]).groupByKey().mapValues(tuple).map(lambda x : Row(**dict(x[1]))).toDF()

Comments

  1. Wow it is truly great and marvelous in this manner it is a lot of valuable for me to comprehend numerous ideas and helped me a ton. it is truly logical well overall and I got more data from your blog.

    Data Engineering Services

    ReplyDelete
  2. Excellent Blog, I like your blog and It is very informative. Thank you

    Pyspark online Training
    Learn Pyspark Online

    ReplyDelete
  3. Thanks for your information is so helpful, and your personal comments are so easy to follow.

    SEO Service in Chennai
    SEO Companies in Chennai
    Chennai SEO company

    ReplyDelete
  4. Sorting out different methods of transposing a DataFrame in PySpark can be quite helpful exploring different ways of manipulating and analysing data. The advantage of pivot operation or select and map functions based on the data structure and needs are different. If your business wants to take full advantage of these techniques, a web development company in new york can help you implement an effective data processing pipeline, making it possible to build rugged and scalable means of data driven decision making.

    ReplyDelete

Post a Comment

Popular posts from this blog

Understanding spark architecture in Deep with YARN

OVERVIEW Apache spark is a Distributed Computing framework. By distributed it doesn’t imply that it can run only on a cluster. Spark can be configured on our local system also. But Since spark works great in clusters and in real time , it is being implemented in multi node clusters like Hadoop, we will consider a Hadoop cluster for explaining spark here. We can Execute spark on a spark cluster in following ways Interactive clients(scala shell,pyspark etc): Usually used for exploration while coding like python shell Submit a job (using spark submit utility):Always used for submitting a production application Basic Architecture Spark follows a Master/Slave Architecture. That is For every submitted application, it creates a Master Process and multiple slave processes. Master is the Driver and Slaves are the executors. Say If from a client machine, we have submitted a spark job to a cluster. Spark will create a driver process and multiple executors. Similraly...

pyspark : groupByKey vs reduceByKey vs aggregateByKey - why reduceByKey and aggregateByKey is preferred in spark2

Through this article I am trying to simplify the concepts of three similar wide transformations such as groupByKey(),reduceByKey() and aggregateByKey(). This blog is for : pyspark (spark with Python) Analysts and all those who are interested in learning pyspark. Pre-requesties: Should have a good knowledge in python as well as should have a basic knowledge of pyspark functions. Data Sets used : For demonstrating purpose , I am using the below data sets (files in HDFS): "orders" with columns order_id,order_tmst,order_customer_id,order_status "orderitems" with columns order_item,order_item_order_id,product_id,order_qty,order_item_subtotal,price_per_qty Before getting in to the functionalities of transformation, we need to understand the shuffling phase in  spark. As we know rdd will be stored in-memory as multiple partitions. The transformation such as map(),flatMap(),filter() etc can be applied to each of these partitions of data independently....