Skip to main content

pyspark : Dealing with partitions

This is a small article in which I am trying to explain on how partitions are created ,how to know its numbers,how to control its numbers ..etc. 

This blog is for :

pyspark (spark with Python) Analysts and all those who are interested in learning pyspark.

Pre-requesties:

Should have a good knowledge in python as well as should have a basic knowledge of pyspark functions.

Data Sets used :
For demonstrating purpose , I am using the below data set (file in HDFS):

"orderitems" with columns order_item,order_item_order_id,product_id,order_qty,order_item_subtotal,price_per_qty

If you are reading from a file to rdd in HDFS , by default it will create number of partitions equals number of blocks it has to read.
Say Default block size of HDFS (2.x)is 128mb, then if a file less than this size  will be read to rdd by spark as 1 partition since only one block is used.
And if it is 256mb file, then 2 partitions and so on.

To know the current number of partitions of an rdd,

orderitems=sc.textFile("/public/retail_db/order_items")
orderitems.getNumPartitions()

To know , number of records in each partition, you can use

orderitems.glom().map(lambda x : len(x))

Now If I save it directly in to HDFS:

orderitems.saveAsTextFile("/user/pathirippilly/spark_jobs/output_data_set/orderitesmpartitioned")

we can see the how many number of partitions are there, that many number of files will be created.

To alter the default partitions created, we can do it in multiple ways

method 1:
While reading a file to rdd  using sparkContext, you can pass the second argument as numPartitions as below 
orderitems=sc.textFile("/public/retail_db/order_items",5) # Here 5 partitions will be created irrespective of the size of the file 

method 2 :
If you want to decrease the number of partitions , we can use coalesce.

orderitems=sc.textFile("/public/retail_db/order_items")
orderitems.coalesce(1) # this will reduce the number of partitions to 1

note:
It avoids a full shuffle. If it's known that the number is decreasing then the executor can safely keep data on the minimum number of partitions,
only moving the data off the extra nodes, onto the nodes that we kept.

method 3:

This is using repartition.This is slower and expensive than coalesce(But you can increase or decrease partitions here)
orderitems=sc.textFile("/public/retail_db/order_items")
orderitems.repartition(5)# Here we are repartitioning to 5

Note:

Keep in mind that repartitioning your data is a fairly expensive operation.  

Comments

Popular posts from this blog

Understanding spark architecture in Deep with YARN

OVERVIEW Apache spark is a Distributed Computing framework. By distributed it doesn’t imply that it can run only on a cluster. Spark can be configured on our local system also. But Since spark works great in clusters and in real time , it is being implemented in multi node clusters like Hadoop, we will consider a Hadoop cluster for explaining spark here. We can Execute spark on a spark cluster in following ways Interactive clients(scala shell,pyspark etc): Usually used for exploration while coding like python shell Submit a job (using spark submit utility):Always used for submitting a production application Basic Architecture Spark follows a Master/Slave Architecture. That is For every submitted application, it creates a Master Process and multiple slave processes. Master is the Driver and Slaves are the executors. Say If from a client machine, we have submitted a spark job to a cluster. Spark will create a driver process and multiple executors. Similraly  if

Pyspark : Read File to RDD and convert to Data Frame

Through this blog, I am trying to explain different ways of creating RDDs from reading files and then creating Data Frames out of RDDs. This blog is for : pyspark (spark with Python) Analysts and all those who are interested in learning pyspark. Pre-requesties: Should have a good knowledge in python as well as should have a basic knowledge of pyspark RDD(Resilient Distributed Datasets): It is an immutable distributed collection of objects. This is the fundamental data structure of spark.By Default when you will read from a file using sparkContext, its converted in to an RDD with each lines as elements of type string.But this lacks of an organised structure Data Frames :  This is created actually for higher-level abstraction by imposing a structure to the above distributed collection.Its having rows and columns (almost similar to pandas).from  spark 2.3.x, Data frames and data sets are more popular and has been used more that RDDs. Learn in more detail here :  ht

Different ways of Transposing a Dataframe in Pyspark

When I have started coding on transposing Dataframes, I found below different methods.  I am sharing all those info here. Creation of a test Input Dataframe to be Transposed ds = {'one':[0.3, 1.2, 1.3, 1.5, 1.4, 1.0],'two':[0.6, 1.2, 1.7, 1.5, 1.4, 2.0]} df = sc.parallelize([ (k,) + tuple(v[0:]) for k,v in ds.items()]).toDF() df.show() method 1 (This method involves conversion of spark object to a python object (rdd to list of tuples of entire data)): inp_list=df.rdd.map(tuple).collect() # Creating a list of tuples of rows # Unpacking the list and zipping all tuples together except the header #list(zip(*inp_list))[1:] is having the transposed tuples  and list(zip(*inp_list))[0] is having the header df_transpose=spark.createDataFrame(list(zip(*inp_list))[1:],list(zip(*inp_list))[0])  df_transpose.show() method 2 (In this method only header data we are converting into python list. Rest of the transformations are carried out as