Skip to main content

Spark (With Python) : map() vs mapPartitions()

As a Beginner in spark, many developers will be having confusions over map() and mapPartitions() functions. This article is an attempt to resolve the confusions

This blog is for :

pyspark (spark with Python) Analysts and all those who are interested in learning pyspark.

Pre-requesties:

Should have a good knowledge in python as well as should have a basic knowledge of pyspark functions.



If you use map() over an rdd , the function called  inside it will run for every record .It means if you have 10M records , function also will be executed 10M times.
This is expensive especially when you are dealing with scenarios involving database connections and querying data from data base.
Lets say inside map function, we have a function defined where we are connecting to a database and querying from it. Lets say our RDD is having 10M records.
Now this function will execute 10M times which means 10M database connections will be created . This  is very expensive.
In these kind of scenarios , we can use mapPartitions() instead of map().mapPartitions() will run the function called only once for each partitions.

example:

We can do a simple word count job for demonstrating this :

lets consider the below dataset:

/public/randomtextwriter/part-m-00000

This dataset is of 1 GB so that by default it will be read as rdd with 9 partitions by spark from HDFS since default block size
of HDFS is 128mb

wordcounttext = sc.textFile("/public/randomtextwriter/part-m-00000") # rdd with 9 partitions

Normal Wordcount logic using map is as below:

wordcounttext.flatMap(lambda x : x.split(" ")).map(lambda x : (x,1)).reduceByKey(lambda x,y : x+y)

Here rdd 'wordcounttext' is having 26421 records. flapMap will execute 26421 times.

But if we implement mapPartition here , the function defined inside it will execute only once for every partition.
And each partition will be read as a generator through which we can iterate and get the line of records.
So dealing with this generator can be done through core-python APIs

The above word count program can be written in two ways as below 

method1:
 wordcount=wordcounttext.mapPartitions(lambda x : map(lambda o:o.split(" "),x)).flatMap(lambda x : x).map(lambda x : (x,1)).reduceByKey(add)

method2:

Here we define a function first which  will consume generator as an argument and iterate through it using itertools.chain.from_iterable().
itertools.chain.from_iterable() functions same as flatMap in spark so that for every partition (every generator) picked by mapPartitions()
,it will convert the records in to independent words and words will be paired with '1'.And the output for each partition by this function will be
a map object which inturn is a generator. And this will be consumed by the lambda function inside mapPartitions()


def getWordTuples(i):
import itertools as it
wordTuples=map(lambda s : (s,1),it.chain.from_iterable(map(lambda s : s.split(" "),i)))
return wordTuples

wordTuples = lines.mapPartitions(lambda i : getWordTuples(i)) # getWordTuples will execute for every partition

wordcount=wordTuples.reduceByKey(add)

NOTE: To get the 'add' function work, we need to import it from operator library as follows
from operator import add

Comments

Post a Comment

Popular posts from this blog

Understanding spark architecture in Deep with YARN

OVERVIEW Apache spark is a Distributed Computing framework. By distributed it doesn’t imply that it can run only on a cluster. Spark can be configured on our local system also. But Since spark works great in clusters and in real time , it is being implemented in multi node clusters like Hadoop, we will consider a Hadoop cluster for explaining spark here. We can Execute spark on a spark cluster in following ways Interactive clients(scala shell,pyspark etc): Usually used for exploration while coding like python shell Submit a job (using spark submit utility):Always used for submitting a production application Basic Architecture Spark follows a Master/Slave Architecture. That is For every submitted application, it creates a Master Process and multiple slave processes. Master is the Driver and Slaves are the executors. Say If from a client machine, we have submitted a spark job to a cluster. Spark will create a driver process and multiple executors. Similraly  if

Pyspark : Read File to RDD and convert to Data Frame

Through this blog, I am trying to explain different ways of creating RDDs from reading files and then creating Data Frames out of RDDs. This blog is for : pyspark (spark with Python) Analysts and all those who are interested in learning pyspark. Pre-requesties: Should have a good knowledge in python as well as should have a basic knowledge of pyspark RDD(Resilient Distributed Datasets): It is an immutable distributed collection of objects. This is the fundamental data structure of spark.By Default when you will read from a file using sparkContext, its converted in to an RDD with each lines as elements of type string.But this lacks of an organised structure Data Frames :  This is created actually for higher-level abstraction by imposing a structure to the above distributed collection.Its having rows and columns (almost similar to pandas).from  spark 2.3.x, Data frames and data sets are more popular and has been used more that RDDs. Learn in more detail here :  ht

Different ways of Transposing a Dataframe in Pyspark

When I have started coding on transposing Dataframes, I found below different methods.  I am sharing all those info here. Creation of a test Input Dataframe to be Transposed ds = {'one':[0.3, 1.2, 1.3, 1.5, 1.4, 1.0],'two':[0.6, 1.2, 1.7, 1.5, 1.4, 2.0]} df = sc.parallelize([ (k,) + tuple(v[0:]) for k,v in ds.items()]).toDF() df.show() method 1 (This method involves conversion of spark object to a python object (rdd to list of tuples of entire data)): inp_list=df.rdd.map(tuple).collect() # Creating a list of tuples of rows # Unpacking the list and zipping all tuples together except the header #list(zip(*inp_list))[1:] is having the transposed tuples  and list(zip(*inp_list))[0] is having the header df_transpose=spark.createDataFrame(list(zip(*inp_list))[1:],list(zip(*inp_list))[0])  df_transpose.show() method 2 (In this method only header data we are converting into python list. Rest of the transformations are carried out as