As a Beginner in spark, many developers will be having confusions over map() and mapPartitions() functions. This article is an attempt to resolve the confusions
This blog is for :
pyspark (spark with Python) Analysts and all those who are interested in learning pyspark.
Pre-requesties:
Should have a good knowledge in python as well as should have a basic knowledge of pyspark functions.
If you use map() over an rdd , the function called inside it will run for every record .It means if you have 10M records , function also will be executed 10M times.
This is expensive especially when you are dealing with scenarios involving database connections and querying data from data base.
Lets say inside map function, we have a function defined where we are connecting to a database and querying from it. Lets say our RDD is having 10M records.
Now this function will execute 10M times which means 10M database connections will be created . This is very expensive.
In these kind of scenarios , we can use mapPartitions() instead of map().mapPartitions() will run the function called only once for each partitions.
example:
We can do a simple word count job for demonstrating this :
lets consider the below dataset:
/public/randomtextwriter/part-m-00000
This dataset is of 1 GB so that by default it will be read as rdd with 9 partitions by spark from HDFS since default block size
of HDFS is 128mb
wordcounttext = sc.textFile("/public/randomtextwriter/part-m-00000") # rdd with 9 partitions
Normal Wordcount logic using map is as below:
wordcounttext.flatMap(lambda x : x.split(" ")).map(lambda x : (x,1)).reduceByKey(lambda x,y : x+y)
Here rdd 'wordcounttext' is having 26421 records. flapMap will execute 26421 times.
But if we implement mapPartition here , the function defined inside it will execute only once for every partition.
And each partition will be read as a generator through which we can iterate and get the line of records.
So dealing with this generator can be done through core-python APIs
The above word count program can be written in two ways as below
method1:
wordcount=wordcounttext.mapPartitions(lambda x : map(lambda o:o.split(" "),x)).flatMap(lambda x : x).map(lambda x : (x,1)).reduceByKey(add)
method2:
Here we define a function first which will consume generator as an argument and iterate through it using itertools.chain.from_iterable().
itertools.chain.from_iterable() functions same as flatMap in spark so that for every partition (every generator) picked by mapPartitions()
,it will convert the records in to independent words and words will be paired with '1'.And the output for each partition by this function will be
a map object which inturn is a generator. And this will be consumed by the lambda function inside mapPartitions()
def getWordTuples(i):
import itertools as it
wordTuples=map(lambda s : (s,1),it.chain.from_iterable(map(lambda s : s.split(" "),i)))
return wordTuples
wordTuples = lines.mapPartitions(lambda i : getWordTuples(i)) # getWordTuples will execute for every partition
wordcount=wordTuples.reduceByKey(add)
NOTE: To get the 'add' function work, we need to import it from operator library as follows
from operator import add
This blog is for :
pyspark (spark with Python) Analysts and all those who are interested in learning pyspark.
Pre-requesties:
Should have a good knowledge in python as well as should have a basic knowledge of pyspark functions.
If you use map() over an rdd , the function called inside it will run for every record .It means if you have 10M records , function also will be executed 10M times.
This is expensive especially when you are dealing with scenarios involving database connections and querying data from data base.
Lets say inside map function, we have a function defined where we are connecting to a database and querying from it. Lets say our RDD is having 10M records.
Now this function will execute 10M times which means 10M database connections will be created . This is very expensive.
In these kind of scenarios , we can use mapPartitions() instead of map().mapPartitions() will run the function called only once for each partitions.
example:
We can do a simple word count job for demonstrating this :
lets consider the below dataset:
/public/randomtextwriter/part-m-00000
This dataset is of 1 GB so that by default it will be read as rdd with 9 partitions by spark from HDFS since default block size
of HDFS is 128mb
wordcounttext = sc.textFile("/public/randomtextwriter/part-m-00000") # rdd with 9 partitions
Normal Wordcount logic using map is as below:
wordcounttext.flatMap(lambda x : x.split(" ")).map(lambda x : (x,1)).reduceByKey(lambda x,y : x+y)
Here rdd 'wordcounttext' is having 26421 records. flapMap will execute 26421 times.
But if we implement mapPartition here , the function defined inside it will execute only once for every partition.
And each partition will be read as a generator through which we can iterate and get the line of records.
So dealing with this generator can be done through core-python APIs
The above word count program can be written in two ways as below
method1:
wordcount=wordcounttext.mapPartitions(lambda x : map(lambda o:o.split(" "),x)).flatMap(lambda x : x).map(lambda x : (x,1)).reduceByKey(add)
method2:
Here we define a function first which will consume generator as an argument and iterate through it using itertools.chain.from_iterable().
itertools.chain.from_iterable() functions same as flatMap in spark so that for every partition (every generator) picked by mapPartitions()
,it will convert the records in to independent words and words will be paired with '1'.And the output for each partition by this function will be
a map object which inturn is a generator. And this will be consumed by the lambda function inside mapPartitions()
def getWordTuples(i):
import itertools as it
wordTuples=map(lambda s : (s,1),it.chain.from_iterable(map(lambda s : s.split(" "),i)))
return wordTuples
wordTuples = lines.mapPartitions(lambda i : getWordTuples(i)) # getWordTuples will execute for every partition
wordcount=wordTuples.reduceByKey(add)
NOTE: To get the 'add' function work, we need to import it from operator library as follows
from operator import add
Excellent Blog, I like your blog and It is very informative. Thank you
ReplyDeletePyspark online Training
Learn Pyspark Online
The contrast between map and mapPartitions is what I expect to address today; whether this is a significant optimization in our data processing, using Spark with Python or not. Now, the map function transforms each element of the RDD, On the other hand, mapPartitions applies the transformation to only RDD partitions. The more of these concepts you master the more efficient you are likely to be, especially when advancing pages such as the life insurance landing page design to promote speed lessons on data handling. Lots of interesting stuff to come in our project!
ReplyDelete