pyspark : groupByKey vs reduceByKey vs aggregateByKey - why reduceByKey and aggregateByKey is preferred in spark2

Through this article I am trying to simplify the concepts of three similar wide transformations such as groupByKey(),reduceByKey() and aggregateByKey().

This blog is for :

pyspark (spark with Python) Analysts and all those who are interested in learning pyspark.


Should have a good knowledge in python as well as should have a basic knowledge of pyspark functions.

Data Sets used :

For demonstrating purpose , I am using the below data sets (files in HDFS):

"orders" with columns order_id,order_tmst,order_customer_id,order_status

"orderitems" with columns order_item,order_item_order_id,product_id,order_qty,order_item_subtotal,price_per_qty

Before getting in to the functionalities of transformation, we need to understand the shuffling phase in  spark. As we know rdd will be stored in-memory as multiple partitions.

The transformation such as map(),flatMap(),filter() etc can be applied to each of these partitions of data independently. So these transformations are called Narrow Transformations

But transformations such as groupByKey(),reduceByKey() ,aggregateByKey() etc needs exchanging and grouping data between partitions.This is called Shuffling  and since if shuffling is needed for any transformation to perform , those transformations are called Wide  Transformations

When ever a shuffle is performed , a new stage is created by spark.Thus if say we have a 
groupByKey() is peformed after a filter() and map(), then filter() and map() will be executed at stage 0 and once groupByKey() is called , shuffling happens and a new stage is created which we can call as stage1. 

For understanding this in detail, lets take the code for calculating order revenue from data set 
"orderitems" with columns order_item,order_item_order_id,product_id,order_qty,order_item_subtotal,price_per_qty

code :

orderitems=sc.textFile("/public/retail_db/order_items") # OrderItems file has been read here order : (int(order.split(",")[1]),float(order.split(",")[4]))) # Creating pairs of order_id and order_item_subtotal

revenueperorder=order_rev_pair.groupByKey().map(lambda x : (x[0],round(sum(x[1]),1))) # grouping and performing sum of order_item_subtotal per order_id

whats happening in shuffle phase for groupByKey:

Here we are applying  groupByKey and on order_rev_pair rdd, which is nothing but an rdd with key value pairs.
Key is order id which is an integer.

So when we apply groupByKey on a rdd which needs to have key value pairs, first it needs to be shuffled based on key.

Shuffling  always heppens with hash mod on no.of partitions


hash mod Key is nothing but , (hash value of the key) modulo (no.of partitions of the rdd)

in python hash value of any integer is the integer itself,for string its different.
For finding hash value , we can use

hash(key) # where key can be string or integer

Any way in our example , key is an hash(key) is also the same.
Lets say we have below key value pairs in one partition  and similarly  we have 3 partitions as a whole.

(5,299.95) => applying hash mod no.partitions , 5 mod 3 => 2
(4,299.95) => applying hash mod no.partitions , 4 mod 3 => 1
(2,129.99) => applying hash mod no.partitions , 2 mod 3 => 1
(5,129.99) => applying hash mod no.partitions , 5 mod 3 => 2

we can see result of mod operation is two distinct values 2,1. 
All values with hash mod value 2 will come under one bucket(partition)
and all with 1 will come under another bucket. 

Like this, for 3 existing partitions, we have 12 records.
And possible hash mod values are only three - 0,1,2.(0 can also come if we have 3 as key - 3 mod 3 is zero)
So possible number of partitions after shuffling are three.

So all values with same hash mode value will come under same bucket.
They will be grouped and sorted automatically.In above case it will be as below

bucket 1:
bucket 2:

(Bucket 0 also can have value which I am not mentioning here)

Now this is stage 1 where groupByKey is happening .

So all records  with same order_id will be grouped together

If you see eventhough its grouped , no.of records transferred from stage 0 to stage 1 is same
And Now we need an extra map function here to perform aggregation

whats happening in shuffle phase for reduceByKey:

If we take the same scenario as above , while performing hash mod operation itself on stage 0,
reduceByKey will be performed for each partition and bucket 1 and bucket 2 of stage 1 will look like this

 bucket 1:
bucket 2:

like this for every partition in stage 0 itself , intermediate aggregation will be done by reduceByKey
And in stage 1, final  level of aggregation will be performed by reduceByKey.

So a combiner implemenation is there here due to which records transferred between stages are less.

This difference will be much high in case of large data sets so that if we use  reduceByKey , 'shuffle read' size will be 

Now aggregateByKey is as same as reduceByKey but only difference is ,we will be providing the intermediate combiner logic as a function.

If we take the above scenario itself, along with total revenue , if we need to calculate the total number of items per order,
we need to , we have to perform to aggregations together, so intermediate logic will change

order_rev_pair.aggregateByKey((0.0,0),lambda x,y: (x[0]+y,x[1]+1),lambda x,y:x[0]+y[0],(x[1]+y[1]))

Here we first tuple is initialization of desired output, second argument is the combiner logic which will be executed on stage 0.

And third argument is the final aggregation logic which will be executed on stage1.


