pyspark : groupByKey vs reduceByKey vs aggregateByKey - why reduceByKey and aggregateByKey is preferred in spark2
Through this article I am trying to simplify the concepts of three similar wide transformations such as groupByKey(),reduceByKey() and aggregateByKey().
This blog is for :
pyspark (spark with Python) Analysts and all those who are interested in learning pyspark.
Pre-requesties:
Should have a good knowledge in python as well as should have a basic knowledge of pyspark functions.
Data Sets used :
For demonstrating purpose , I am using the below data sets (files in HDFS):
"orders" with columns order_id,order_tmst,order_customer_id,order_status
"orderitems" with columns order_item,order_item_order_id,product_id,order_qty,order_item_subtotal,price_per_qty
Before getting in to the functionalities of transformation, we need to understand the shuffling phase in spark. As we know rdd will be stored in-memory as multiple partitions.
The transformation such as map(),flatMap(),filter() etc can be applied to each of these partitions of data independently. So these transformations are called Narrow Transformations
But transformations such as groupByKey(),reduceByKey() ,aggregateByKey() etc needs exchanging and grouping data between partitions.This is called Shuffling and since if shuffling is needed for any transformation to perform , those transformations are called Wide Transformations
When ever a shuffle is performed , a new stage is created by spark.Thus if say we have a
groupByKey() is peformed after a filter() and map(), then filter() and map() will be executed at stage 0 and once groupByKey() is called , shuffling happens and a new stage is created which we can call as stage1.
For understanding this in detail, lets take the code for calculating order revenue from data set
"orderitems" with columns order_item,order_item_order_id,product_id,order_qty,order_item_subtotal,price_per_qty
code :
orderitems=sc.textFile("/public/retail_db/order_items") # OrderItems file has been read here
order_rev_pair=orderitems.map(lambda order : (int(order.split(",")[1]),float(order.split(",")[4]))) # Creating pairs of order_id and order_item_subtotal
revenueperorder=order_rev_pair.groupByKey().map(lambda x : (x[0],round(sum(x[1]),1))) # grouping and performing sum of order_item_subtotal per order_id
whats happening in shuffle phase for groupByKey:
Here we are applying groupByKey and on order_rev_pair rdd, which is nothing but an rdd with key value pairs.
Key is order id which is an integer.
So when we apply groupByKey on a rdd which needs to have key value pairs, first it needs to be shuffled based on key.
Shuffling always heppens with hash mod on no.of partitions
HashmodKey:
hash mod Key is nothing but , (hash value of the key) modulo (no.of partitions of the rdd)
in python hash value of any integer is the integer itself,for string its different.
For finding hash value , we can use
hash(key) # where key can be string or integer
Any way in our example , key is an integer.so hash(key) is also the same.
Lets say we have below key value pairs in one partition and similarly we have 3 partitions as a whole.
(5,299.95) => applying hash mod no.partitions , 5 mod 3 => 2
(4,299.95) => applying hash mod no.partitions , 4 mod 3 => 1
(2,129.99) => applying hash mod no.partitions , 2 mod 3 => 1
(5,129.99) => applying hash mod no.partitions , 5 mod 3 => 2
we can see result of mod operation is two distinct values 2,1.
All values with hash mod value 2 will come under one bucket(partition)
and all with 1 will come under another bucket.
Like this, for 3 existing partitions, we have 12 records.
And possible hash mod values are only three - 0,1,2.(0 can also come if we have 3 as key - 3 mod 3 is zero)
So possible number of partitions after shuffling are three.
So all values with same hash mode value will come under same bucket.
They will be grouped and sorted automatically.In above case it will be as below
bucket 1:
(2,129.99)
(4,299.95)
bucket 2:
(5,299.95)
5,129.99)
(Bucket 0 also can have value which I am not mentioning here)
Now this is stage 1 where groupByKey is happening .
So all records with same order_id will be grouped together
(5,[299.95,129.99])
(2,129.99)
(4,299.95)
If you see eventhough its grouped , no.of records transferred from stage 0 to stage 1 is same
And Now we need an extra map function here to perform aggregation
whats happening in shuffle phase for reduceByKey:
If we take the same scenario as above , while performing hash mod operation itself on stage 0,
reduceByKey will be performed for each partition and bucket 1 and bucket 2 of stage 1 will look like this
bucket 1:
(2,129.99)
(4,299.95)
bucket 2:
(5,429.94)
like this for every partition in stage 0 itself , intermediate aggregation will be done by reduceByKey
And in stage 1, final level of aggregation will be performed by reduceByKey.
So a combiner implemenation is there here due to which records transferred between stages are less.
This difference will be much high in case of large data sets so that if we use reduceByKey , 'shuffle read' size will be
less
Now aggregateByKey is as same as reduceByKey but only difference is ,we will be providing the intermediate combiner logic as a function.
If we take the above scenario itself, along with total revenue , if we need to calculate the total number of items per order,
we need to , we have to perform to aggregations together, so intermediate logic will change
order_rev_pair.aggregateByKey((0.0,0),lambda x,y: (x[0]+y,x[1]+1),lambda x,y:x[0]+y[0],(x[1]+y[1]))
Here we first tuple is initialization of desired output, second argument is the combiner logic which will be executed on stage 0.
And third argument is the final aggregation logic which will be executed on stage1.
Tibco spotfire online training
ReplyDeleteRPA online training
Ruby on rails online training
We are proud to be a leading provider in the real estate industry, developing, implementing and programming custom real estate management software and apps that seamlessly integrates with CRMs, MLS platforms, and automated workflows. Real estate app development Company
ReplyDeleteOur web app development services leverage the modern web capabilities to deliver a uniform app-like look & feel with a frictionless, near-native user experience in a much lighter environment. As a trusted development company, we ensure Easy accessibility, cross-device compatibility, and pixel-perfect design. https://www.clavax.com/services/Web-Engineering
ReplyDeleteNice article! Thanks for sharing so much good information on this topic, I have read your whole blog which is very helpful for beginners. Discord Clone App is a platform for Voice-over IP, Instant Messaging, and Digital Distribution. In private chat or as part of a "server", users communicate with voice calls, video calls, text messaging, media, and files.
ReplyDeleteGreat, thanks for sharing this blog.Really looking forward to read more. Keep writing
ReplyDeleteoracle rac online training
oracle rac training
Great Article.
ReplyDeleteBEP20 Token Development
BEP20 Token Development Company
Tron Token Development Company
Metaverse Token Development Company
ERC20 Token Development Company
Polygon Token Development Company
NFT Token Development Company