Skip to main content

pyspark : groupByKey vs reduceByKey vs aggregateByKey - why reduceByKey and aggregateByKey is preferred in spark2


Through this article I am trying to simplify the concepts of three similar wide transformations such as groupByKey(),reduceByKey() and aggregateByKey().


This blog is for :

pyspark (spark with Python) Analysts and all those who are interested in learning pyspark.

Pre-requesties:

Should have a good knowledge in python as well as should have a basic knowledge of pyspark functions.

Data Sets used :

For demonstrating purpose , I am using the below data sets (files in HDFS):

"orders" with columns order_id,order_tmst,order_customer_id,order_status


"orderitems" with columns order_item,order_item_order_id,product_id,order_qty,order_item_subtotal,price_per_qty


Before getting in to the functionalities of transformation, we need to understand the shuffling phase in  spark. As we know rdd will be stored in-memory as multiple partitions.

The transformation such as map(),flatMap(),filter() etc can be applied to each of these partitions of data independently. So these transformations are called Narrow Transformations

But transformations such as groupByKey(),reduceByKey() ,aggregateByKey() etc needs exchanging and grouping data between partitions.This is called Shuffling  and since if shuffling is needed for any transformation to perform , those transformations are called Wide  Transformations

When ever a shuffle is performed , a new stage is created by spark.Thus if say we have a 
groupByKey() is peformed after a filter() and map(), then filter() and map() will be executed at stage 0 and once groupByKey() is called , shuffling happens and a new stage is created which we can call as stage1. 

For understanding this in detail, lets take the code for calculating order revenue from data set 
"orderitems" with columns order_item,order_item_order_id,product_id,order_qty,order_item_subtotal,price_per_qty


code :

orderitems=sc.textFile("/public/retail_db/order_items") # OrderItems file has been read here 

order_rev_pair=orderitems.map(lambda order : (int(order.split(",")[1]),float(order.split(",")[4]))) # Creating pairs of order_id and order_item_subtotal


revenueperorder=order_rev_pair.groupByKey().map(lambda x : (x[0],round(sum(x[1]),1))) # grouping and performing sum of order_item_subtotal per order_id


whats happening in shuffle phase for groupByKey:

Here we are applying  groupByKey and on order_rev_pair rdd, which is nothing but an rdd with key value pairs.
Key is order id which is an integer.

So when we apply groupByKey on a rdd which needs to have key value pairs, first it needs to be shuffled based on key.

Shuffling  always heppens with hash mod on no.of partitions

HashmodKey:

hash mod Key is nothing but , (hash value of the key) modulo (no.of partitions of the rdd)

in python hash value of any integer is the integer itself,for string its different.
For finding hash value , we can use

hash(key) # where key can be string or integer

Any way in our example , key is an integer.so hash(key) is also the same.
Lets say we have below key value pairs in one partition  and similarly  we have 3 partitions as a whole.

(5,299.95) => applying hash mod no.partitions , 5 mod 3 => 2
(4,299.95) => applying hash mod no.partitions , 4 mod 3 => 1
(2,129.99) => applying hash mod no.partitions , 2 mod 3 => 1
(5,129.99) => applying hash mod no.partitions , 5 mod 3 => 2

we can see result of mod operation is two distinct values 2,1. 
All values with hash mod value 2 will come under one bucket(partition)
and all with 1 will come under another bucket. 

Like this, for 3 existing partitions, we have 12 records.
And possible hash mod values are only three - 0,1,2.(0 can also come if we have 3 as key - 3 mod 3 is zero)
So possible number of partitions after shuffling are three.

So all values with same hash mode value will come under same bucket.
They will be grouped and sorted automatically.In above case it will be as below

bucket 1:
(2,129.99)
(4,299.95)
bucket 2:
(5,299.95)
5,129.99)

(Bucket 0 also can have value which I am not mentioning here)

Now this is stage 1 where groupByKey is happening .

So all records  with same order_id will be grouped together
(5,[299.95,129.99])
(2,129.99)
(4,299.95)

If you see eventhough its grouped , no.of records transferred from stage 0 to stage 1 is same
And Now we need an extra map function here to perform aggregation


whats happening in shuffle phase for reduceByKey:

If we take the same scenario as above , while performing hash mod operation itself on stage 0,
reduceByKey will be performed for each partition and bucket 1 and bucket 2 of stage 1 will look like this

 bucket 1:
(2,129.99)
(4,299.95)
bucket 2:
(5,429.94)

like this for every partition in stage 0 itself , intermediate aggregation will be done by reduceByKey
And in stage 1, final  level of aggregation will be performed by reduceByKey.

So a combiner implemenation is there here due to which records transferred between stages are less.


This difference will be much high in case of large data sets so that if we use  reduceByKey , 'shuffle read' size will be 
less

Now aggregateByKey is as same as reduceByKey but only difference is ,we will be providing the intermediate combiner logic as a function.

If we take the above scenario itself, along with total revenue , if we need to calculate the total number of items per order,
we need to , we have to perform to aggregations together, so intermediate logic will change


order_rev_pair.aggregateByKey((0.0,0),lambda x,y: (x[0]+y,x[1]+1),lambda x,y:x[0]+y[0],(x[1]+y[1]))

Here we first tuple is initialization of desired output, second argument is the combiner logic which will be executed on stage 0.

And third argument is the final aggregation logic which will be executed on stage1.




Comments

  1. We are proud to be a leading provider in the real estate industry, developing, implementing and programming custom real estate management software and apps that seamlessly integrates with CRMs, MLS platforms, and automated workflows. Real estate app development Company

    ReplyDelete
  2. Our web app development services leverage the modern web capabilities to deliver a uniform app-like look & feel with a frictionless, near-native user experience in a much lighter environment. As a trusted development company, we ensure Easy accessibility, cross-device compatibility, and pixel-perfect design. https://www.clavax.com/services/Web-Engineering

    ReplyDelete
  3. Nice article! Thanks for sharing so much good information on this topic, I have read your whole blog which is very helpful for beginners. Discord Clone App is a platform for Voice-over IP, Instant Messaging, and Digital Distribution. In private chat or as part of a "server", users communicate with voice calls, video calls, text messaging, media, and files.

    ReplyDelete
  4. Great, thanks for sharing this blog.Really looking forward to read more. Keep writing
    oracle rac online training
    oracle rac training

    ReplyDelete

Post a Comment

Popular posts from this blog

Understanding spark architecture in Deep with YARN

OVERVIEW Apache spark is a Distributed Computing framework. By distributed it doesn’t imply that it can run only on a cluster. Spark can be configured on our local system also. But Since spark works great in clusters and in real time , it is being implemented in multi node clusters like Hadoop, we will consider a Hadoop cluster for explaining spark here. We can Execute spark on a spark cluster in following ways Interactive clients(scala shell,pyspark etc): Usually used for exploration while coding like python shell Submit a job (using spark submit utility):Always used for submitting a production application Basic Architecture Spark follows a Master/Slave Architecture. That is For every submitted application, it creates a Master Process and multiple slave processes. Master is the Driver and Slaves are the executors. Say If from a client machine, we have submitted a spark job to a cluster. Spark will create a driver process and multiple executors. Similraly...

Different ways of Transposing a Dataframe in Pyspark

When I have started coding on transposing Dataframes, I found below different methods.  I am sharing all those info here. Creation of a test Input Dataframe to be Transposed ds = {'one':[0.3, 1.2, 1.3, 1.5, 1.4, 1.0],'two':[0.6, 1.2, 1.7, 1.5, 1.4, 2.0]} df = sc.parallelize([ (k,) + tuple(v[0:]) for k,v in ds.items()]).toDF() df.show() method 1 (This method involves conversion of spark object to a python object (rdd to list of tuples of entire data)): inp_list=df.rdd.map(tuple).collect() # Creating a list of tuples of rows # Unpacking the list and zipping all tuples together except the header #list(zip(*inp_list))[1:] is having the transposed tuples  and list(zip(*inp_list))[0] is having the header df_transpose=spark.createDataFrame(list(zip(*inp_list))[1:],list(zip(*inp_list))[0])  df_transpose.show() method 2 (In this method only header data we are converting into python list. Rest of the transformations are carried out as...

Spark (With Python) : map() vs mapPartitions()

As a Beginner in spark, many developers will be having confusions over map() and mapPartitions() functions. This article is an attempt to resolve the confusions This blog is for : pyspark (spark with Python) Analysts and all those who are interested in learning pyspark. Pre-requesties: Should have a good knowledge in python as well as should have a basic knowledge of pyspark functions. If you use map() over an rdd , the function called  inside it will run for every record .It means if you have 10M records , function also will be executed 10M times. This is expensive especially when you are dealing with scenarios involving database connections and querying data from data base. Lets say inside map function, we have a function defined where we are connecting to a database and querying from it. Lets say our RDD is having 10M records. Now this function will execute 10M times which means 10M database connections will be created . This  is very expensive. In these kind ...