Skip to main content

pyspark : groupByKey vs reduceByKey vs aggregateByKey - why reduceByKey and aggregateByKey is preferred in spark2


Through this article I am trying to simplify the concepts of three similar wide transformations such as groupByKey(),reduceByKey() and aggregateByKey().


This blog is for :

pyspark (spark with Python) Analysts and all those who are interested in learning pyspark.

Pre-requesties:

Should have a good knowledge in python as well as should have a basic knowledge of pyspark functions.

Data Sets used :

For demonstrating purpose , I am using the below data sets (files in HDFS):

"orders" with columns order_id,order_tmst,order_customer_id,order_status


"orderitems" with columns order_item,order_item_order_id,product_id,order_qty,order_item_subtotal,price_per_qty


Before getting in to the functionalities of transformation, we need to understand the shuffling phase in  spark. As we know rdd will be stored in-memory as multiple partitions.

The transformation such as map(),flatMap(),filter() etc can be applied to each of these partitions of data independently. So these transformations are called Narrow Transformations

But transformations such as groupByKey(),reduceByKey() ,aggregateByKey() etc needs exchanging and grouping data between partitions.This is called Shuffling  and since if shuffling is needed for any transformation to perform , those transformations are called Wide  Transformations

When ever a shuffle is performed , a new stage is created by spark.Thus if say we have a 
groupByKey() is peformed after a filter() and map(), then filter() and map() will be executed at stage 0 and once groupByKey() is called , shuffling happens and a new stage is created which we can call as stage1. 

For understanding this in detail, lets take the code for calculating order revenue from data set 
"orderitems" with columns order_item,order_item_order_id,product_id,order_qty,order_item_subtotal,price_per_qty


code :

orderitems=sc.textFile("/public/retail_db/order_items") # OrderItems file has been read here 

order_rev_pair=orderitems.map(lambda order : (int(order.split(",")[1]),float(order.split(",")[4]))) # Creating pairs of order_id and order_item_subtotal


revenueperorder=order_rev_pair.groupByKey().map(lambda x : (x[0],round(sum(x[1]),1))) # grouping and performing sum of order_item_subtotal per order_id


whats happening in shuffle phase for groupByKey:

Here we are applying  groupByKey and on order_rev_pair rdd, which is nothing but an rdd with key value pairs.
Key is order id which is an integer.

So when we apply groupByKey on a rdd which needs to have key value pairs, first it needs to be shuffled based on key.

Shuffling  always heppens with hash mod on no.of partitions

HashmodKey:

hash mod Key is nothing but , (hash value of the key) modulo (no.of partitions of the rdd)

in python hash value of any integer is the integer itself,for string its different.
For finding hash value , we can use

hash(key) # where key can be string or integer

Any way in our example , key is an integer.so hash(key) is also the same.
Lets say we have below key value pairs in one partition  and similarly  we have 3 partitions as a whole.

(5,299.95) => applying hash mod no.partitions , 5 mod 3 => 2
(4,299.95) => applying hash mod no.partitions , 4 mod 3 => 1
(2,129.99) => applying hash mod no.partitions , 2 mod 3 => 1
(5,129.99) => applying hash mod no.partitions , 5 mod 3 => 2

we can see result of mod operation is two distinct values 2,1. 
All values with hash mod value 2 will come under one bucket(partition)
and all with 1 will come under another bucket. 

Like this, for 3 existing partitions, we have 12 records.
And possible hash mod values are only three - 0,1,2.(0 can also come if we have 3 as key - 3 mod 3 is zero)
So possible number of partitions after shuffling are three.

So all values with same hash mode value will come under same bucket.
They will be grouped and sorted automatically.In above case it will be as below

bucket 1:
(2,129.99)
(4,299.95)
bucket 2:
(5,299.95)
5,129.99)

(Bucket 0 also can have value which I am not mentioning here)

Now this is stage 1 where groupByKey is happening .

So all records  with same order_id will be grouped together
(5,[299.95,129.99])
(2,129.99)
(4,299.95)

If you see eventhough its grouped , no.of records transferred from stage 0 to stage 1 is same
And Now we need an extra map function here to perform aggregation


whats happening in shuffle phase for reduceByKey:

If we take the same scenario as above , while performing hash mod operation itself on stage 0,
reduceByKey will be performed for each partition and bucket 1 and bucket 2 of stage 1 will look like this

 bucket 1:
(2,129.99)
(4,299.95)
bucket 2:
(5,429.94)

like this for every partition in stage 0 itself , intermediate aggregation will be done by reduceByKey
And in stage 1, final  level of aggregation will be performed by reduceByKey.

So a combiner implemenation is there here due to which records transferred between stages are less.


This difference will be much high in case of large data sets so that if we use  reduceByKey , 'shuffle read' size will be 
less

Now aggregateByKey is as same as reduceByKey but only difference is ,we will be providing the intermediate combiner logic as a function.

If we take the above scenario itself, along with total revenue , if we need to calculate the total number of items per order,
we need to , we have to perform to aggregations together, so intermediate logic will change


order_rev_pair.aggregateByKey((0.0,0),lambda x,y: (x[0]+y,x[1]+1),lambda x,y:x[0]+y[0],(x[1]+y[1]))

Here we first tuple is initialization of desired output, second argument is the combiner logic which will be executed on stage 0.

And third argument is the final aggregation logic which will be executed on stage1.




Comments

  1. We are proud to be a leading provider in the real estate industry, developing, implementing and programming custom real estate management software and apps that seamlessly integrates with CRMs, MLS platforms, and automated workflows. Real estate app development Company

    ReplyDelete
  2. Our web app development services leverage the modern web capabilities to deliver a uniform app-like look & feel with a frictionless, near-native user experience in a much lighter environment. As a trusted development company, we ensure Easy accessibility, cross-device compatibility, and pixel-perfect design. https://www.clavax.com/services/Web-Engineering

    ReplyDelete
  3. Nice article! Thanks for sharing so much good information on this topic, I have read your whole blog which is very helpful for beginners. Discord Clone App is a platform for Voice-over IP, Instant Messaging, and Digital Distribution. In private chat or as part of a "server", users communicate with voice calls, video calls, text messaging, media, and files.

    ReplyDelete
  4. Great, thanks for sharing this blog.Really looking forward to read more. Keep writing
    oracle rac online training
    oracle rac training

    ReplyDelete

Post a Comment

Popular posts from this blog

Understanding spark architecture in Deep with YARN

OVERVIEW Apache spark is a Distributed Computing framework. By distributed it doesn’t imply that it can run only on a cluster. Spark can be configured on our local system also. But Since spark works great in clusters and in real time , it is being implemented in multi node clusters like Hadoop, we will consider a Hadoop cluster for explaining spark here. We can Execute spark on a spark cluster in following ways Interactive clients(scala shell,pyspark etc): Usually used for exploration while coding like python shell Submit a job (using spark submit utility):Always used for submitting a production application Basic Architecture Spark follows a Master/Slave Architecture. That is For every submitted application, it creates a Master Process and multiple slave processes. Master is the Driver and Slaves are the executors. Say If from a client machine, we have submitted a spark job to a cluster. Spark will create a driver process and multiple executors. Similraly  if

Pyspark : Read File to RDD and convert to Data Frame

Through this blog, I am trying to explain different ways of creating RDDs from reading files and then creating Data Frames out of RDDs. This blog is for : pyspark (spark with Python) Analysts and all those who are interested in learning pyspark. Pre-requesties: Should have a good knowledge in python as well as should have a basic knowledge of pyspark RDD(Resilient Distributed Datasets): It is an immutable distributed collection of objects. This is the fundamental data structure of spark.By Default when you will read from a file using sparkContext, its converted in to an RDD with each lines as elements of type string.But this lacks of an organised structure Data Frames :  This is created actually for higher-level abstraction by imposing a structure to the above distributed collection.Its having rows and columns (almost similar to pandas).from  spark 2.3.x, Data frames and data sets are more popular and has been used more that RDDs. Learn in more detail here :  ht

Different ways of Transposing a Dataframe in Pyspark

When I have started coding on transposing Dataframes, I found below different methods.  I am sharing all those info here. Creation of a test Input Dataframe to be Transposed ds = {'one':[0.3, 1.2, 1.3, 1.5, 1.4, 1.0],'two':[0.6, 1.2, 1.7, 1.5, 1.4, 2.0]} df = sc.parallelize([ (k,) + tuple(v[0:]) for k,v in ds.items()]).toDF() df.show() method 1 (This method involves conversion of spark object to a python object (rdd to list of tuples of entire data)): inp_list=df.rdd.map(tuple).collect() # Creating a list of tuples of rows # Unpacking the list and zipping all tuples together except the header #list(zip(*inp_list))[1:] is having the transposed tuples  and list(zip(*inp_list))[0] is having the header df_transpose=spark.createDataFrame(list(zip(*inp_list))[1:],list(zip(*inp_list))[0])  df_transpose.show() method 2 (In this method only header data we are converting into python list. Rest of the transformations are carried out as